From 2676fb683da99d2a452f263f18bb69429f01fb73 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Thu, 8 Aug 2019 09:50:22 -0400 Subject: [PATCH 01/24] Added base agent class and updated config/client settings --- src/prefect/__init__.py | 2 + src/prefect/agent/__init__.py | 4 + src/prefect/agent/agent.py | 197 ++++++++++++++++++++++++++++++++++ src/prefect/client/client.py | 6 +- src/prefect/config.toml | 4 + 5 files changed, 211 insertions(+), 2 deletions(-) create mode 100644 src/prefect/agent/__init__.py create mode 100644 src/prefect/agent/agent.py diff --git a/src/prefect/__init__.py b/src/prefect/__init__.py index c630bd8ee26f..40dc339e42d1 100644 --- a/src/prefect/__init__.py +++ b/src/prefect/__init__.py @@ -16,6 +16,8 @@ import prefect.serialization +import prefect.agent + from ._version import get_versions __version__ = get_versions()["version"] # type: ignore diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py new file mode 100644 index 000000000000..d7ccd70a3fc5 --- /dev/null +++ b/src/prefect/agent/__init__.py @@ -0,0 +1,4 @@ +from prefect.agent.agent import Agent + +# import prefect_agent.kubernetes +# import prefect_agent.nomad diff --git a/src/prefect/agent/agent.py b/src/prefect/agent/agent.py new file mode 100644 index 000000000000..5bc64923910d --- /dev/null +++ b/src/prefect/agent/agent.py @@ -0,0 +1,197 @@ +import logging +import os +import pendulum +import time + +from prefect import config +from prefect.client import Client +from prefect.serialization import state +from prefect.engine.state import Submitted +from prefect.utilities.graphql import with_args + + +class Agent: + """ + Base class for Agents. + + This Agent class is a standard point for interacting with Prefect Cloud. It is meant + to have subclasses which inherit functionality from this class. The only piece that + the subclasses should implement is the `deploy_flows` function. It is built in this + way to keep Prefect Cloud logic standard but allows for platform specific + customizability. + + In order for this to operate `PREFECT__CLOUD__AGENT__AUTH_TOKEN` must be set as an + environment variable. + """ + + def __init__(self) -> None: + # self.auth_token = config.cloud.agent.get("auth_token") + self.loop_interval = config.cloud.agent.get("loop_interval") + + self.client = Client(token=config.cloud.agent.get("auth_token")) + + logger = logging.getLogger("agent") + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ch.setFormatter(formatter) + logger.addHandler(ch) + + self.logger = logger + + def start(self) -> None: + """ + The main entrypoint to the agent. This function loops and constantly polls for + new flow runs to deploy + """ + self.logger.info(f"Starting {type(self).__name__}") + tenant_id = self.query_tenant_id() + + while True: + try: + flow_runs = self.query_flow_runs(tenant_id=tenant_id) + self.logger.info( + f"Found {len(flow_runs)} flow run(s) to submit for execution." + ) + + self.update_states(flow_runs) + self.deploy_flows(flow_runs) + self.logger.info( + f"Submitted {len(flow_runs)} flow run(s) for execution." + ) + except Exception as exc: + self.logger.error(exc) + time.sleep(self.loop_interval) + + def query_tenant_id(self) -> str: + """ + Query Prefect Cloud for the tenant id that corresponds to the agent's auth token + + Returns: + - str: The current tenant id + """ + query = {"query": {"tenant": {"id"}}} + result = self.client.graphql(query) + return result.data.tenant[0].id # type: ignore + + def query_flow_runs(self, tenant_id: str) -> list: + """ + Query Prefect Cloud for flow runs which need to be deployed and executed + + Args: + - tenant_id (str): The tenant id to use in the query + + Returns: + - list: A list of GraphQLResult flow run objects + """ + + # Get scheduled flow runs from queue + mutation = { + "mutation($input: getRunsInQueueInput!)": { + "getRunsInQueue(input: $input)": {"flow_run_ids"} + } + } + + result = self.client.graphql( + mutation, variables={"input": {"tenantId": tenant_id}} + ) + flow_run_ids = result.data.getRunsInQueue.flow_run_ids # type: ignore + now = pendulum.now("UTC") + + # Query metadata fow flow runs found in queue + query = { + "query": { + with_args( + "flow_run", + { + # match flow runs in the flow_run_ids list + "where": { + "id": {"_in": flow_run_ids}, + "_or": [ + # who are EITHER scheduled... + {"state": {"_eq": "Scheduled"}}, + # OR running with task runs scheduled to start more than 3 seconds ago + { + "state": {"_eq": "Running"}, + "task_runs": { + "state_start_time": { + "_lte": str(now.subtract(seconds=3)) + } + }, + }, + ], + } + }, + ): { + "id": True, + "version": True, + "tenant_id": True, + "state": True, + "serialized_state": True, + "parameters": True, + "flow": {"id", "name", "environment", "storage"}, + with_args( + "task_runs", + { + "where": { + "state_start_time": { + "_lte": str(now.subtract(seconds=3)) + } + } + }, + ): {"id", "version", "task_id", "serialized_state"}, + } + } + } + + result = self.client.graphql(query) + return result.data.flow_run # type: ignore + + def update_states(self, flow_runs: list) -> None: + """ + After a flow run is grabbed this function sets the state to Submitted so it + won't be picked up by any other processes + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + # Set flow run state to `Submitted` if it is currently `Scheduled` + if state.StateSchema().load(flow_run.serialized_state).is_scheduled(): + self.client.set_flow_run_state( + flow_run_id=flow_run.id, + version=flow_run.version, + state=Submitted( + message="Submitted for execution", + state=state.StateSchema().load(flow_run.serialized_state), + ), + ) + + # Set task run states to `Submitted` if they are currently `Scheduled` + for task_run in flow_run.task_runs: + if state.StateSchema().load(task_run.serialized_state).is_scheduled(): + self.client.set_task_run_state( + task_run_id=task_run.id, + version=task_run.version, + state=Submitted( + message="Submitted for execution", + state=state.StateSchema().load(task_run.serialized_state), + ), + ) + + def deploy_flows(self, flow_runs: list) -> None: + """ + Meant to be overridden by a platform specific deployment option + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + pass + + +if __name__ == "__main__": + Agent().start() diff --git a/src/prefect/client/client.py b/src/prefect/client/client.py index a024a4532a90..9118dd265ce7 100644 --- a/src/prefect/client/client.py +++ b/src/prefect/client/client.py @@ -62,15 +62,17 @@ class Client: Args: - graphql_server (str, optional): the URL to send all GraphQL requests to; if not provided, will be pulled from `cloud.graphql` config var + - token (str, optional): a Prefect Cloud auth token for communication; if not + provided, will be pulled from `cloud.auth_token` config var """ - def __init__(self, graphql_server: str = None): + def __init__(self, graphql_server: str = None, token: str = None): if not graphql_server: graphql_server = prefect.config.cloud.get("graphql") self.graphql_server = graphql_server - token = prefect.config.cloud.get("auth_token", None) + token = token or prefect.config.cloud.get("auth_token", None) self.token_is_local = False if token is None: diff --git a/src/prefect/config.toml b/src/prefect/config.toml index 2e270208161e..de6a10f60c75 100644 --- a/src/prefect/config.toml +++ b/src/prefect/config.toml @@ -9,6 +9,10 @@ graphql = "${cloud.api}/graphql/alpha" use_local_secrets = true heartbeat_interval = 30.0 + [cloud.agent] + # Agents require different API tokens + auth_token = "" + loop_interval = 5 [logging] # The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL From b9677ac4c018696f38054a348cdc4ac6d9a0232e Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Thu, 8 Aug 2019 10:18:36 -0400 Subject: [PATCH 02/24] Added Kubernetes agent --- src/prefect/agent/__init__.py | 7 +- src/prefect/agent/kubernetes/README.md | 11 + src/prefect/agent/kubernetes/__init__.py | 2 + src/prefect/agent/kubernetes/agent.py | 97 ++++++ src/prefect/agent/kubernetes/deployment.yaml | 55 ++++ src/prefect/agent/kubernetes/job_spec.yaml | 47 +++ .../agent/kubernetes/resource_manager.py | 282 ++++++++++++++++++ src/prefect/config.toml | 4 + 8 files changed, 504 insertions(+), 1 deletion(-) create mode 100644 src/prefect/agent/kubernetes/README.md create mode 100644 src/prefect/agent/kubernetes/__init__.py create mode 100644 src/prefect/agent/kubernetes/agent.py create mode 100644 src/prefect/agent/kubernetes/deployment.yaml create mode 100644 src/prefect/agent/kubernetes/job_spec.yaml create mode 100644 src/prefect/agent/kubernetes/resource_manager.py diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py index d7ccd70a3fc5..682c4cdd3596 100644 --- a/src/prefect/agent/__init__.py +++ b/src/prefect/agent/__init__.py @@ -1,4 +1,9 @@ from prefect.agent.agent import Agent -# import prefect_agent.kubernetes +try: + import prefect.agent.kubernetes +except ImportError: + raise ImportError( + 'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.' + ) # import prefect_agent.nomad diff --git a/src/prefect/agent/kubernetes/README.md b/src/prefect/agent/kubernetes/README.md new file mode 100644 index 000000000000..bbe98e3bf769 --- /dev/null +++ b/src/prefect/agent/kubernetes/README.md @@ -0,0 +1,11 @@ +# k8s-agent + +The Prefect Kubernetes agent that turns a cluster into a workflow orchestration system. + +If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default` + +Quick Start: + +- Build Dockerfile and push to registry +- Update the `image` and `PREFECT__CLOUD__AUTH_TOKEN` env value in the deployment yaml +- Run `kubectl apply -f deployment.yaml` diff --git a/src/prefect/agent/kubernetes/__init__.py b/src/prefect/agent/kubernetes/__init__.py new file mode 100644 index 000000000000..d4877b5ac645 --- /dev/null +++ b/src/prefect/agent/kubernetes/__init__.py @@ -0,0 +1,2 @@ +from prefect.agent.kubernetes.agent import KubernetesAgent +from prefect.agent.kubernetes.resource_manager import ResourceManager diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py new file mode 100644 index 000000000000..122180b66b2e --- /dev/null +++ b/src/prefect/agent/kubernetes/agent.py @@ -0,0 +1,97 @@ +import logging +import os +from os import path +import uuid + +from kubernetes import client, config +import yaml + +from prefect.agent import Agent +from prefect.environments.storage import Docker +from prefect.serialization.storage import StorageSchema +from prefect.utilities.graphql import GraphQLResult + + +class KubernetesAgent(Agent): + """ + Agent which deploys flow runs as Kubernetes jobs. Currently this is required to either + run on a k8s cluster or on a local machine where the kube_config is pointing at the + desired cluster. + """ + + def __init__(self) -> None: + super().__init__() + + try: + config.load_incluster_config() + except config.config_exception.ConfigException as exc: + self.logger.warning(f"{exc} Using out of cluster configuration option.") + config.load_kube_config() + + def deploy_flows(self, flow_runs: list) -> None: + """ + Deploy flow runs on to a k8s cluster as jobs + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + batch_client = client.BatchV1Api() + + for flow_run in flow_runs: + + # Require Docker storage + if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): + self.logger.error( + f"Storage for flow run {flow_run.id} is not of type Docker." + ) + continue + + job_spec = self.replace_job_spec_yaml(flow_run) + + batch_client.create_namespaced_job( + namespace=os.getenv("NAMESPACE", "default"), body=job_spec + ) + + def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict: + """ + Populate metadata and variables in the job_spec.yaml file for flow runs + + Args: + - flow_run (GraphQLResult): A flow run objects + + Returns: + - dict: a dictionary representing the populated yaml object + """ + with open(path.join(path.dirname(__file__), "job_spec.yaml"), "r") as job_file: + job = yaml.safe_load(job_file) + + identifier = str(uuid.uuid4())[:8] + job_name = "prefect-job-{}".format(identifier) + + # Populate job metadata for identification + job["metadata"]["name"] = job_name + job["metadata"]["labels"]["app"] = job_name + job["metadata"]["labels"]["identifier"] = identifier + job["metadata"]["labels"]["flow_run_id"] = flow_run.id + job["metadata"]["labels"]["flow_id"] = flow_run.flow.id + job["spec"]["template"]["metadata"]["labels"]["app"] = job_name + job["spec"]["template"]["metadata"]["labels"]["identifier"] = identifier + + # Use flow storage image for job + job["spec"]["template"]["spec"]["containers"][0]["image"] = ( + StorageSchema().load(flow_run.flow.storage).name + ) + + # Populate environment variables for flow run execution + env = job["spec"]["template"]["spec"]["containers"][0]["env"] + + env[0]["value"] = os.getenv("PREFECT__CLOUD__API", "https://api.prefect.io") + env[1]["value"] = os.environ["PREFECT__CLOUD__AGENT__AUTH_TOKEN"] + env[2]["value"] = flow_run.id + env[3]["value"] = os.getenv("NAMESPACE", "default") + + return job + + +if __name__ == "__main__": + KubernetesAgent().start() diff --git a/src/prefect/agent/kubernetes/deployment.yaml b/src/prefect/agent/kubernetes/deployment.yaml new file mode 100644 index 000000000000..ccf0f12e489a --- /dev/null +++ b/src/prefect/agent/kubernetes/deployment.yaml @@ -0,0 +1,55 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: prefect-agent + labels: + app: prefect-agent +spec: + replicas: 1 + selector: + matchLabels: + app: prefect-agent + template: + metadata: + labels: + app: prefect-agent + spec: + containers: + - name: agent + image: prefecthq/prefect:latest + imagePullPolicy: IfNotPresent + command: ["/bin/bash", "-c"] + args: ["prefect agent start kubernetes-agent"] # COMMAND TBD + env: + - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN + value: "" + - name: PREFECT__CLOUD__API + value: "https://api.prefect.io" + - name: PREFECT__CLOUD__AGENT__LOOP_INTERVAL + value: "5" + - name: NAMESPACE + value: "default" + resources: + limits: + memory: "128Mi" + cpu: "100m" + - name: resource-manager + image: prefecthq/prefect:latest + imagePullPolicy: IfNotPresent + command: ["/bin/bash", "-c"] + args: ["prefect agent start kubernetes-resource-manager"] # COMMAND TBD + env: + - name: PREFECT__CLOUD__AGENT__AUTH_TOKEN + value: "" + - name: PREFECT__CLOUD__API + value: "https://api.prefect.io" + - name: PREFECT__CLOUD__AGENT__RESOURCE_MANAGER__LOOP_INTERVAL + value: "60" + - name: NAMESPACE + value: "default" + resources: + limits: + memory: "128Mi" + cpu: "100m" + imagePullSecrets: + - name: "" diff --git a/src/prefect/agent/kubernetes/job_spec.yaml b/src/prefect/agent/kubernetes/job_spec.yaml new file mode 100644 index 000000000000..3c016bcc4ac9 --- /dev/null +++ b/src/prefect/agent/kubernetes/job_spec.yaml @@ -0,0 +1,47 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: prefect-job-UUID + labels: + app: prefect-job-UUID + identifier: UUID +spec: + template: + metadata: + labels: + app: prefect-job-UUID + identifier: UUID + spec: + containers: + - name: flow + image: prefecthq/prefect:latest + imagePullPolicy: Always + command: ["/bin/sh", "-c"] + args: ["prefect execute cloud-flow"] + env: + - name: PREFECT__CLOUD__API + value: PREFECT__CLOUD__API + - name: PREFECT__CLOUD__AUTH_TOKEN + value: PREFECT__CLOUD__AUTH_TOKEN + - name: PREFECT__CONTEXT__FLOW_RUN_ID + value: PREFECT__CONTEXT__FLOW_RUN_ID + - name: PREFECT__CONTEXT__NAMESPACE + value: PREFECT__CONTEXT__NAMESPACE + - name: PREFECT__CLOUD__USE_LOCAL_SECRETS + value: "false" + - name: PREFECT__LOGGING__LOG_TO_CLOUD + value: "true" + - name: PREFECT__LOGGING__LEVEL + value: "DEBUG" + - name: PREFECT__CLOUD__USE_LOCAL_SECRETS + value: "false" + - name: PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS + value: "prefect.engine.cloud.CloudFlowRunner" + - name: PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS + value: "prefect.engine.cloud.CloudTaskRunner" + resources: + requests: + cpu: "100m" + limits: + cpu: "100m" + restartPolicy: Never diff --git a/src/prefect/agent/kubernetes/resource_manager.py b/src/prefect/agent/kubernetes/resource_manager.py new file mode 100644 index 000000000000..8752cb94ac25 --- /dev/null +++ b/src/prefect/agent/kubernetes/resource_manager.py @@ -0,0 +1,282 @@ +import logging +import os +import time + +from kubernetes import client, config +import pendulum + +from prefect import Client + + +class ResourceManager: + """ + The resource manager is responsible for cleaning up old completed/failed k8s jobs + and pods from the cluster. This is optional and does not need to me used for the agent + to work. + """ + + def __init__(self) -> None: + self.loop_interval = config.cloud.agent.resource_manager.get("loop_interval") + self.client = Client(token=config.cloud.agent.get("auth_token")) + self.namespace = os.getenv("NAMESPACE", "default") + + logger = logging.getLogger("resource-manager") + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ch.setFormatter(formatter) + logger.addHandler(ch) + + self.logger = logger + + try: + config.load_incluster_config() + except config.config_exception.ConfigException as exc: + self.logger.warning(f"{exc} Using out of cluster configuration option.") + config.load_kube_config() + + def start(self) -> None: + """ + Main loop which waits on a `LOOP_INTERVAL` and looks for finished jobs to clean + """ + self.logger.info(f"Starting {type(self).__name__}") + while True: + try: + self.clean_resources() + except Exception as exc: + self.logger.error(exc) + time.sleep(self.loop_interval) + + # IDENTIFICATION + + def clean_resources(self) -> None: + """ + Find jobs that are either completed or failed to delete from the cluster + """ + batch_client = client.BatchV1Api() + + try: + jobs = batch_client.list_namespaced_job(namespace=self.namespace) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to list jobs in namespace {self.namespace}" + ) + return + + for job in jobs.items: + if job.status.succeeded or job.status.failed: + + identifier = job.metadata.labels.get("identifier") + name = job.metadata.name + + if job.status.failed: + self.logger.info( + f"Found failed job {name} in namespace {self.namespace}" + ) + self.report_failed_job(identifier=identifier) + + self.delete_job(name=name) + self.delete_pods(job_name=name, identifier=identifier) + + if not jobs.items: + self.clean_extra_pods() + + def clean_extra_pods(self) -> None: + """ + Any runaway pods which failed due to unexpected reasons will be cleaned up here. + ImagePullBackoffs, Evictions, etc... + """ + core_client = client.CoreV1Api() + + try: + pods = core_client.list_namespaced_pod(namespace=self.namespace) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to list pods in namespace {self.namespace}" + ) + return + + for pod in pods.items: + phase = pod.status.phase + if phase != "Running": + + name = pod.metadata.name + + if phase == "Failed": + self.report_failed_pod(pod=pod) + + if phase == "Unknown": + self.report_unknown_pod(pod=pod) + + if phase == "Pending": + if pod.status.container_statuses: + self.report_pod_image_pull_error(pod=pod) + + self.delete_extra_pod(name=name) + + # DELETION + + def delete_job(self, name: str) -> None: + """ + Delete a job based on the name + """ + batch_client = client.BatchV1Api() + self.logger.info(f"Deleting job {name} in namespace {self.namespace}") + + try: + batch_client.delete_namespaced_job( + name=name, namespace=self.namespace, body=client.V1DeleteOptions() + ) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to delete job {name} in namespace {self.namespace}" + ) + + def delete_pods(self, job_name: str, identifier: str) -> None: + """ + Delete a pod based on the job name and identifier + """ + core_client = client.CoreV1Api() + try: + pods = core_client.list_namespaced_pod( + namespace=self.namespace, + label_selector="identifier={}".format(identifier), + ) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to list pods in namespace {self.namespace}" + ) + return + + if pods: + self.logger.info( + f"Deleting {len(pods.items)} pods for job {job_name} in namespace {self.namespace}" + ) + for pod in pods.items: + name = pod.metadata.name + + try: + core_client.delete_namespaced_pod( + name=name, namespace=self.namespace, body=client.V1DeleteOptions() + ) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to delete pod {name} in namespace {self.namespace}" + ) + + def delete_extra_pod(self, name: str) -> None: + """ + Delete a pod based on the name + """ + core_client = client.CoreV1Api() + self.logger.info(f"Deleting extra pod {name} in namespace {self.namespace}") + + try: + core_client.delete_namespaced_pod( + name=name, namespace=self.namespace, body=client.V1DeleteOptions() + ) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to delete pod {name} in namespace {self.namespace}" + ) + + # REPORTING + + def report_failed_job(self, identifier: str) -> None: + """ + Report jobs that failed for reasons outside of a flow run + """ + core_client = client.CoreV1Api() + try: + pods = core_client.list_namespaced_pod( + namespace=self.namespace, + label_selector="identifier={}".format(identifier), + ) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to list pods in namespace {self.namespace}" + ) + return + + for pod in pods.items: + phase = pod.status.phase + if phase == "Failed": + self.report_failed_pod(pod) + + def report_failed_pod(self, pod: client.V1Pod) -> None: + """ + Report pods that failed for reasons outside of a flow run. Write cloud log + """ + core_client = client.CoreV1Api() + name = pod.metadata.name + + if pod.status.reason == "Evicted": + logs = "Pod was evicted due to cluster resource constraints / auto scaling." + else: + try: + logs = core_client.read_namespaced_pod_log( + namespace=self.namespace, name=name + ) + except client.rest.ApiException: + self.logger.error( + f"Error attempting to read pod logs for {name} in namespace {self.namespace}" + ) + return + + self.logger.info(f"Reporting failed pod {name} in namespace {self.namespace}") + + self.client.write_run_log( + flow_run_id=pod.metadata.labels.get("flow_run_id"), + task_run_id="", + timestamp=str(pendulum.now()), + name="resource-manager", + message=logs, + level="ERROR", + info={}, + ) + + def report_unknown_pod(self, pod: client.V1Pod) -> None: + """ + Write cloud log of pods that entered unknonw states + """ + name = pod.metadata.name + self.logger.info(f"Reporting unknown pod {name} in namespace {self.namespace}") + + self.client.write_run_log( + flow_run_id=pod.metadata.labels.get("flow_run_id"), + task_run_id="", + timestamp=str(pendulum.now()), + name="resource-manager", + message=f"Flow run pod {name} entered an unknown state in namespace {self.namespace}", + level="ERROR", + info={}, + ) + + def report_pod_image_pull_error(self, pod: client.V1Pod) -> None: + """ + Write cloud log of pods that ahd image pull errors + """ + for status in pod.status.container_statuses: + waiting = status.state.waiting + + if waiting and waiting.reason == "ImagePullBackoff": + self.logger.info( + f"Reporting image pull error for pod {pod.metadata.name} in namespace {self.namespace}" + ) + + self.client.write_run_log( + flow_run_id=pod.metadata.labels.get("flow_run_id"), + task_run_id="", + timestamp=str(pendulum.now()), + name="resource-manager", + message=f"Flow run image pull error for pod {pod.metadata.name} in namespace {self.namespace}", + level="ERROR", + info={}, + ) + + +if __name__ == "__main__": + ResourceManager().start() diff --git a/src/prefect/config.toml b/src/prefect/config.toml index de6a10f60c75..ace5215907ee 100644 --- a/src/prefect/config.toml +++ b/src/prefect/config.toml @@ -14,6 +14,10 @@ heartbeat_interval = 30.0 auth_token = "" loop_interval = 5 + [cloud.agent.resource_manager] + # Separate loop interval for resource managers + loop_interval = 60 + [logging] # The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL level = "INFO" From adcb3edad21d36a95d0d9c19041b5774e995b1ae Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 12:08:14 -0400 Subject: [PATCH 03/24] Add Nomad agent --- src/prefect/agent/__init__.py | 3 +- src/prefect/agent/kubernetes/agent.py | 8 +-- src/prefect/agent/nomad/README.md | 0 src/prefect/agent/nomad/__init__.py | 1 + src/prefect/agent/nomad/agent.py | 81 ++++++++++++++++++++++++ src/prefect/agent/nomad/deployment.nomad | 48 ++++++++++++++ src/prefect/agent/nomad/job_spec.nomad | 41 ++++++++++++ 7 files changed, 177 insertions(+), 5 deletions(-) create mode 100644 src/prefect/agent/nomad/README.md create mode 100644 src/prefect/agent/nomad/__init__.py create mode 100644 src/prefect/agent/nomad/agent.py create mode 100644 src/prefect/agent/nomad/deployment.nomad create mode 100644 src/prefect/agent/nomad/job_spec.nomad diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py index 682c4cdd3596..efbdc315d2ec 100644 --- a/src/prefect/agent/__init__.py +++ b/src/prefect/agent/__init__.py @@ -6,4 +6,5 @@ raise ImportError( 'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.' ) -# import prefect_agent.nomad + +import prefect_agent.nomad diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py index 122180b66b2e..54fa7f612809 100644 --- a/src/prefect/agent/kubernetes/agent.py +++ b/src/prefect/agent/kubernetes/agent.py @@ -72,14 +72,14 @@ def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict: job["metadata"]["name"] = job_name job["metadata"]["labels"]["app"] = job_name job["metadata"]["labels"]["identifier"] = identifier - job["metadata"]["labels"]["flow_run_id"] = flow_run.id - job["metadata"]["labels"]["flow_id"] = flow_run.flow.id + job["metadata"]["labels"]["flow_run_id"] = flow_run.id # type: ignore + job["metadata"]["labels"]["flow_id"] = flow_run.flow.id # type: ignore job["spec"]["template"]["metadata"]["labels"]["app"] = job_name job["spec"]["template"]["metadata"]["labels"]["identifier"] = identifier # Use flow storage image for job job["spec"]["template"]["spec"]["containers"][0]["image"] = ( - StorageSchema().load(flow_run.flow.storage).name + StorageSchema().load(flow_run.flow.storage).name # type: ignore ) # Populate environment variables for flow run execution @@ -87,7 +87,7 @@ def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict: env[0]["value"] = os.getenv("PREFECT__CLOUD__API", "https://api.prefect.io") env[1]["value"] = os.environ["PREFECT__CLOUD__AGENT__AUTH_TOKEN"] - env[2]["value"] = flow_run.id + env[2]["value"] = flow_run.id # type: ignore env[3]["value"] = os.getenv("NAMESPACE", "default") return job diff --git a/src/prefect/agent/nomad/README.md b/src/prefect/agent/nomad/README.md new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/prefect/agent/nomad/__init__.py b/src/prefect/agent/nomad/__init__.py new file mode 100644 index 000000000000..38e7b0fab39f --- /dev/null +++ b/src/prefect/agent/nomad/__init__.py @@ -0,0 +1 @@ +from prefect.agent.nomad.agent import NomadAgent diff --git a/src/prefect/agent/nomad/agent.py b/src/prefect/agent/nomad/agent.py new file mode 100644 index 000000000000..bc866656ec8e --- /dev/null +++ b/src/prefect/agent/nomad/agent.py @@ -0,0 +1,81 @@ +import json +import os +from os import path +import uuid + +import requests + +from prefect.agent import Agent +from prefect.environments.storage import Docker +from prefect.serialization.storage import StorageSchema +from prefect.utilities.graphql import GraphQLResult + + +class NomadAgent(Agent): + """ + Agent which deploys flow runs as Nomad jobs to a Nomad cluster based on the + `NOMAD_HOST` environment variable. + """ + + def __init__(self) -> None: + super().__init__() + + def deploy_flows(self, flow_runs: list) -> None: + """ + Deploy flow runs on to a Nomad cluster as jobs + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): + self.logger.error( + f"Storage for flow run {flow_run.id} is not of type Docker." + ) + continue + + job_spec = self.replace_job_spec_json(flow_run) + nomad_host = os.getenv("NOMAD_HOST", "http://127.0.0.1:4646") + requests.post(path.join(nomad_host, "v1/jobs"), json=job_spec) + + def replace_job_spec_json(self, flow_run: GraphQLResult) -> dict: + """ + Populate metadata and variables in the job_spec.nomad file for flow runs + + Args: + - flow_run (GraphQLResult): A flow run objects + + Returns: + - dict: a dictionary representing the populated json object + """ + with open(path.join(path.dirname(__file__), "job_spec.nomad"), "r") as job_file: + job = json.load(job_file) + + job["Job"]["ID"] = flow_run.id # type: ignore + job["Job"]["Name"] = "prefect-job-{}".format(str(uuid.uuid4())[:8]) + + job["Job"]["TaskGroups"][0]["Name"] = "prefect-job-{}".format( + flow_run.id # type: ignore + ) + job["Job"]["TaskGroups"][0]["Tasks"][0]["Name"] = flow_run.id # type: ignore + + job["Job"]["TaskGroups"][0]["Tasks"][0]["Config"]["image"] = ( + StorageSchema().load(flow_run.flow.storage).name # type: ignore + ) + + env = job["Job"]["TaskGroups"][0]["Tasks"][0]["Env"] + env["PREFECT__CLOUD__API"] = os.getenv( + "PREFECT__CLOUD__API", "https://api.prefect.io" + ) + env["PREFECT__CLOUD__AGENT__AUTH_TOKEN"] = os.environ[ + "PREFECT__CLOUD__AGENT__AUTH_TOKEN" + ] + env["PREFECT__CONTEXT__FLOW_RUN_ID"] = flow_run.id # type: ignore + env["PREFECT__CONTEXT__NAMESPACE"] = os.getenv("NAMESPACE", "default") + + return job + + +if __name__ == "__main__": + NomadAgent().start() diff --git a/src/prefect/agent/nomad/deployment.nomad b/src/prefect/agent/nomad/deployment.nomad new file mode 100644 index 000000000000..8c53f324eaba --- /dev/null +++ b/src/prefect/agent/nomad/deployment.nomad @@ -0,0 +1,48 @@ +job "prefect-agent" { + # region = "global" + + datacenters = ["dc1"] +# datacenters = ["us-east-1"] + + type = "service" + + # eventually we might consider placing the resource manager in this group as well + group "prefect" { + count = 1 + + restart { + attempts = 5 + delay = "5s" + interval = "30s" + mode = "fail" + } + + task "agent" { + # The "driver" parameter specifies the task driver that should be used to + # run the task. + driver = "docker" + + # config for docker + config { + image = "IMAGE" + command = "/bin/bash" + args = ["-c", "prefect agent start nomad-agent"] + force_pull = true + } + + env { + NOMAD_HOST = "http://127.0.0.1:4646" + PREFECT__CLOUD__AGENT__AUTH_TOKEN = "TOKEN" + PREFECT__CLOUD__API = "https://api.prefect.io" + LOOP_INTERVAL = "5" + NAMESPACE = "default" + } + + resources { + cpu = 200 + memory = 128 + } + + } + } +} diff --git a/src/prefect/agent/nomad/job_spec.nomad b/src/prefect/agent/nomad/job_spec.nomad new file mode 100644 index 000000000000..1967702bb638 --- /dev/null +++ b/src/prefect/agent/nomad/job_spec.nomad @@ -0,0 +1,41 @@ +{ + "Job": { + "ID": "prefect-job-UUID", + "Name": "prefect-job-UUID", + "Type": "batch", + "Datacenters": [ + "dc1" + ], + "TaskGroups": [{ + "Name": "flow", + "Count": 1, + "Tasks": [{ + "Name": "flow", + "Driver": "docker", + "Env": { + "PREFECT__CLOUD__API": "XX", + "PREFECT__CLOUD__AUTH_TOKEN": "XX", + "PREFECT__CONTEXT__FLOW_RUN_ID": "XX", + "PREFECT__CONTEXT__NAMESPACE": "XX", + "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", + "PREFECT__LOGGING__LOG_TO_CLOUD": "true", + "PREFECT__LOGGING__LEVEL": "DEBUG", + "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", + "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner", + "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner" + }, + "Config": { + "image": "prefecthq/prefect:latest", + "command": "/bin/bash", + "args": ["-c", "prefect execute cloud-flow"] + }, + "Resources": { + "CPU": 100 + } + }], + "RestartPolicy": { + "Attempts": 0 + } + }] + } +} From ba877d78cf75ec1945b269460643f1371c7fdd37 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 13:29:02 -0400 Subject: [PATCH 04/24] Working local agent --- src/prefect/agent/__init__.py | 3 +- src/prefect/agent/local/__init__.py | 1 + src/prefect/agent/local/agent.py | 76 +++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 src/prefect/agent/local/__init__.py create mode 100644 src/prefect/agent/local/agent.py diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py index efbdc315d2ec..e1d9ae33c914 100644 --- a/src/prefect/agent/__init__.py +++ b/src/prefect/agent/__init__.py @@ -7,4 +7,5 @@ 'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.' ) -import prefect_agent.nomad +import prefect.agent.local +import prefect.agent.nomad diff --git a/src/prefect/agent/local/__init__.py b/src/prefect/agent/local/__init__.py new file mode 100644 index 000000000000..5af9f495b2b4 --- /dev/null +++ b/src/prefect/agent/local/__init__.py @@ -0,0 +1 @@ +from prefect.agent.local.agent import LocalAgent diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py new file mode 100644 index 000000000000..2eded7131311 --- /dev/null +++ b/src/prefect/agent/local/agent.py @@ -0,0 +1,76 @@ +import os +from os import path +import uuid +import sys + +import docker +import yaml + +from prefect import config +from prefect.agent import Agent +from prefect.environments.storage import Docker +from prefect.serialization.storage import StorageSchema +from prefect.utilities.graphql import GraphQLResult + + +class LocalAgent(Agent): + """ + Agent which deploys flow runs locally as Docker containers. + """ + + def __init__(self) -> None: + super().__init__() + + self.docker_client = docker.APIClient(base_url="unix://var/run/docker.sock") + try: + self.docker_client.ping() + except Exception as exc: + self.logger.error( + "Issue connecting to the Docker daemon. Make sure it is running." + ) + raise exc + + def deploy_flows(self, flow_runs: list) -> None: + """ + Deploy flow runs on to a k8s cluster as jobs + + Args: + - flow_runs (list): A list of GraphQLResult flow run objects + """ + for flow_run in flow_runs: + + storage = StorageSchema().load(flow_run.flow.storage) + if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): + self.logger.error( + f"Storage for flow run {flow_run.id} is not of type Docker." + ) + continue + + env_vars = self.populate_env_vars(flow_run=flow_run) + + # Pull image if it doesn't exist locally + self.docker_client.pull(storage.name) + + # Create a container + container = self.docker_client.create_container( + storage.name, command="prefect execute cloud-flow", environment=env_vars + ) + + # Start the container + self.docker_client.start(container=container.get("Id")) + + def populate_env_vars(self, flow_run: GraphQLResult) -> dict: + return { + "PREFECT__CLOUD__API": config.cloud.api, + "PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token, + "PREFECT__CONTEXT__FLOW_RUN_ID": flow_run.id, # type: ignore + "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", + "PREFECT__LOGGING__LOG_TO_CLOUD": "true", + "PREFECT__LOGGING__LEVEL": "DEBUG", + "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner", + "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner", + } + + +if __name__ == "__main__": + LocalAgent().start() From 4477336dc31591b818a197e3e4e53546b8bab022 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 14:21:33 -0400 Subject: [PATCH 05/24] Local agent base URL parameterization and doc cleanup --- src/prefect/agent/kubernetes/agent.py | 2 +- src/prefect/agent/local/agent.py | 27 +++++++++++++++++++-------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py index 54fa7f612809..d50ac59de438 100644 --- a/src/prefect/agent/kubernetes/agent.py +++ b/src/prefect/agent/kubernetes/agent.py @@ -57,7 +57,7 @@ def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict: Populate metadata and variables in the job_spec.yaml file for flow runs Args: - - flow_run (GraphQLResult): A flow run objects + - flow_run (GraphQLResult): A flow run object Returns: - dict: a dictionary representing the populated yaml object diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index 2eded7131311..3038531c73fd 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -1,10 +1,4 @@ -import os -from os import path -import uuid -import sys - import docker -import yaml from prefect import config from prefect.agent import Agent @@ -16,12 +10,20 @@ class LocalAgent(Agent): """ Agent which deploys flow runs locally as Docker containers. + + Args: + - base_url (str, optional): URL for a Docker daemon server. Defaults to + `unix:///var/run/docker.sock` however other hosts such as + `tcp://0.0.0.0:2375` can be provided """ - def __init__(self) -> None: + def __init__(self, base_url: str = None) -> None: super().__init__() - self.docker_client = docker.APIClient(base_url="unix://var/run/docker.sock") + base_url = base_url or "unix://var/run/docker.sock" + self.docker_client = docker.APIClient(base_url=base_url) + + # Ping Docker daemon for connection issues try: self.docker_client.ping() except Exception as exc: @@ -60,6 +62,15 @@ def deploy_flows(self, flow_runs: list) -> None: self.docker_client.start(container=container.get("Id")) def populate_env_vars(self, flow_run: GraphQLResult) -> dict: + """ + Populate metadata and variables in the environment variables for a flow run + + Args: + - flow_run (GraphQLResult): A flow run object + + Returns: + - dict: a dictionary representing the populated environment variables + """ return { "PREFECT__CLOUD__API": config.cloud.api, "PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token, From 8bca40d90d08acec5ca0f4113b6ab39cd60a8fa4 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 14:25:12 -0400 Subject: [PATCH 06/24] Clean up k8s and nomad agents for mypy checks --- src/prefect/agent/kubernetes/agent.py | 1 - src/prefect/agent/kubernetes/job_spec.yaml | 2 -- src/prefect/agent/kubernetes/resource_manager.py | 6 +++--- src/prefect/agent/nomad/job_spec.nomad | 1 - 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py index d50ac59de438..fc8989370cd1 100644 --- a/src/prefect/agent/kubernetes/agent.py +++ b/src/prefect/agent/kubernetes/agent.py @@ -1,4 +1,3 @@ -import logging import os from os import path import uuid diff --git a/src/prefect/agent/kubernetes/job_spec.yaml b/src/prefect/agent/kubernetes/job_spec.yaml index 3c016bcc4ac9..b3b0cc1d0aa6 100644 --- a/src/prefect/agent/kubernetes/job_spec.yaml +++ b/src/prefect/agent/kubernetes/job_spec.yaml @@ -33,8 +33,6 @@ spec: value: "true" - name: PREFECT__LOGGING__LEVEL value: "DEBUG" - - name: PREFECT__CLOUD__USE_LOCAL_SECRETS - value: "false" - name: PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS value: "prefect.engine.cloud.CloudFlowRunner" - name: PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS diff --git a/src/prefect/agent/kubernetes/resource_manager.py b/src/prefect/agent/kubernetes/resource_manager.py index 8752cb94ac25..dd7c64e22bc4 100644 --- a/src/prefect/agent/kubernetes/resource_manager.py +++ b/src/prefect/agent/kubernetes/resource_manager.py @@ -231,7 +231,7 @@ def report_failed_pod(self, pod: client.V1Pod) -> None: self.client.write_run_log( flow_run_id=pod.metadata.labels.get("flow_run_id"), task_run_id="", - timestamp=str(pendulum.now()), + timestamp=pendulum.now(), name="resource-manager", message=logs, level="ERROR", @@ -248,7 +248,7 @@ def report_unknown_pod(self, pod: client.V1Pod) -> None: self.client.write_run_log( flow_run_id=pod.metadata.labels.get("flow_run_id"), task_run_id="", - timestamp=str(pendulum.now()), + timestamp=pendulum.now(), name="resource-manager", message=f"Flow run pod {name} entered an unknown state in namespace {self.namespace}", level="ERROR", @@ -270,7 +270,7 @@ def report_pod_image_pull_error(self, pod: client.V1Pod) -> None: self.client.write_run_log( flow_run_id=pod.metadata.labels.get("flow_run_id"), task_run_id="", - timestamp=str(pendulum.now()), + timestamp=pendulum.now(), name="resource-manager", message=f"Flow run image pull error for pod {pod.metadata.name} in namespace {self.namespace}", level="ERROR", diff --git a/src/prefect/agent/nomad/job_spec.nomad b/src/prefect/agent/nomad/job_spec.nomad index 1967702bb638..ea1b1a83be8c 100644 --- a/src/prefect/agent/nomad/job_spec.nomad +++ b/src/prefect/agent/nomad/job_spec.nomad @@ -20,7 +20,6 @@ "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", "PREFECT__LOGGING__LOG_TO_CLOUD": "true", "PREFECT__LOGGING__LEVEL": "DEBUG", - "PREFECT__CLOUD__USE_LOCAL_SECRETS": "false", "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner", "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner" }, From fc72aa154bdc29023e79503f60955596bf518b2b Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 14:55:24 -0400 Subject: [PATCH 07/24] Add agent start CLI command` --- src/prefect/agent/local/agent.py | 2 +- src/prefect/cli/__init__.py | 3 ++ src/prefect/cli/agent.py | 58 ++++++++++++++++++++++++++++++++ src/prefect/cli/auth.py | 3 -- 4 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 src/prefect/cli/agent.py diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index 3038531c73fd..6ec809863b3e 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -21,7 +21,7 @@ def __init__(self, base_url: str = None) -> None: super().__init__() base_url = base_url or "unix://var/run/docker.sock" - self.docker_client = docker.APIClient(base_url=base_url) + self.docker_client = docker.APIClient(base_url=base_url, version="auto") # Ping Docker daemon for connection issues try: diff --git a/src/prefect/cli/__init__.py b/src/prefect/cli/__init__.py index 620a5b90866b..a91b24bdd06c 100644 --- a/src/prefect/cli/__init__.py +++ b/src/prefect/cli/__init__.py @@ -5,6 +5,7 @@ import prefect +from .agent import agent as _agent from .auth import auth as _auth from .describe import describe as _describe from .execute import execute as _execute @@ -33,6 +34,7 @@ def cli(): Execution Commands: execute Execute a flow's environment run Run a flow + agent Manage agents \b Setup Commands: @@ -46,6 +48,7 @@ def cli(): pass +cli.add_command(_agent) cli.add_command(_auth) cli.add_command(_describe) cli.add_command(_execute) diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py new file mode 100644 index 000000000000..b7b986166041 --- /dev/null +++ b/src/prefect/cli/agent.py @@ -0,0 +1,58 @@ +import click + +from prefect import config +from prefect import agent as prefect_agent +from prefect.utilities.configuration import set_temporary_config + +_agents = { + "local": prefect_agent.local.LocalAgent, + "kubernetes": prefect_agent.kubernetes.KubernetesAgent, + "nomad": prefect_agent.nomad.NomadAgent, +} + + +@click.group(hidden=True) +def agent(): + """ + Manage Prefect agents. + + \b + Usage: + $ prefect agent [COMMAND] + + \b + Arguments: + start Start a Prefect agent + + \b + Examples: + $ prefect agent start + + \b + $ prefect agent start kubernetes --token MY_TOKEN + """ + pass + + +@agent.command(hidden=True) +@click.argument("name", default="local") +@click.option( + "--token", "-t", required=False, help="A Prefect Cloud API token.", hidden=True +) +def start(name, token): + """ + Start an agent. + + \b + Arguments: + name TEXT The name of an agent to start (e.g. `local`, `kubernetes`, `nomad`) + Defaults to `local` + + \b + Options: + --token, -t TEXT A Prefect Cloud api token + """ + with set_temporary_config( + {"cloud.agent.auth_token": token or config.cloud.agent.auth_token} + ): + _agents.get(name, ValueError())().start() diff --git a/src/prefect/cli/auth.py b/src/prefect/cli/auth.py index 936144a270a3..ffb292ab2c6c 100644 --- a/src/prefect/cli/auth.py +++ b/src/prefect/cli/auth.py @@ -20,9 +20,6 @@ def auth(): \b Examples: $ prefect auth login --token MY_TOKEN - - \b - $ prefect auth login --token MY_TOKEN """ pass From 5afed329210c13a559f891ddf717e07ade0ac24b Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 15:04:27 -0400 Subject: [PATCH 08/24] Added tests for agent start CLI command --- src/prefect/cli/agent.py | 8 ++++++- tests/cli/test_agent.py | 47 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 tests/cli/test_agent.py diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index b7b986166041..b645c7e4517a 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -55,4 +55,10 @@ def start(name, token): with set_temporary_config( {"cloud.agent.auth_token": token or config.cloud.agent.auth_token} ): - _agents.get(name, ValueError())().start() + retrieved_agent = _agents.get(name, None) + + if not retrieved_agent: + click.secho(f"{name} is not a valid agent", color="red") + return + + retrieved_agent().start() diff --git a/tests/cli/test_agent.py b/tests/cli/test_agent.py new file mode 100644 index 000000000000..794a28a88aac --- /dev/null +++ b/tests/cli/test_agent.py @@ -0,0 +1,47 @@ +from unittest.mock import MagicMock + +from click.testing import CliRunner + +from prefect.cli.agent import agent + + +def test_agent_init(): + runner = CliRunner() + result = runner.invoke(agent) + assert result.exit_code == 0 + assert "Manage Prefect agents." in result.output + + +def test_agent_help(): + runner = CliRunner() + result = runner.invoke(agent, ["--help"]) + assert result.exit_code == 0 + assert "Manage Prefect agents." in result.output + + +def test_agent_start(monkeypatch): + start = MagicMock() + monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + + runner = CliRunner() + result = runner.invoke(agent, ["start"]) + assert result.exit_code == 0 + + +def test_agent_start_token(monkeypatch): + start = MagicMock() + monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + + runner = CliRunner() + result = runner.invoke(agent, ["start", "-t", "test"]) + assert result.exit_code == 0 + + +def test_agent_start_fails(monkeypatch): + start = MagicMock() + monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + + runner = CliRunner() + result = runner.invoke(agent, ["start", "TEST"]) + assert result.exit_code == 0 + assert "TEST is not a valid agent" in result.output From 35789827fc53a784c40e83e3476fe325a2c992da Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 15:06:23 -0400 Subject: [PATCH 09/24] Omit agent coverage check temporarily --- setup.cfg | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.cfg b/setup.cfg index be7ec546f90f..af5bbf3ce6b5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,8 @@ omit= **__init__.py *_version.py *_siginfo.py + # temporary agent omit + src/prefect/agent/* parallel = True [coverage:report] From 15f1fcc25c43f3eba60a65323e488e63e38550d1 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 15:21:51 -0400 Subject: [PATCH 10/24] Move KubernetesAgent import level due to package extras requirement --- src/prefect/agent/__init__.py | 11 +++-------- src/prefect/agent/kubernetes/__init__.py | 9 +++++++-- src/prefect/cli/agent.py | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py index e1d9ae33c914..1c59893468c2 100644 --- a/src/prefect/agent/__init__.py +++ b/src/prefect/agent/__init__.py @@ -1,11 +1,6 @@ -from prefect.agent.agent import Agent - -try: - import prefect.agent.kubernetes -except ImportError: - raise ImportError( - 'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.' - ) +# only agents that don't require `extras` should be automatically imported here; +# others must be explicitly imported so they can raise helpful errors if appropriate +from prefect.agent.agent import Agent import prefect.agent.local import prefect.agent.nomad diff --git a/src/prefect/agent/kubernetes/__init__.py b/src/prefect/agent/kubernetes/__init__.py index d4877b5ac645..7e69517fba7b 100644 --- a/src/prefect/agent/kubernetes/__init__.py +++ b/src/prefect/agent/kubernetes/__init__.py @@ -1,2 +1,7 @@ -from prefect.agent.kubernetes.agent import KubernetesAgent -from prefect.agent.kubernetes.resource_manager import ResourceManager +try: + from prefect.agent.kubernetes.agent import KubernetesAgent + from prefect.agent.kubernetes.resource_manager import ResourceManager +except ImportError: + raise ImportError( + 'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.' + ) diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index b645c7e4517a..8f37324e4de5 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -2,11 +2,12 @@ from prefect import config from prefect import agent as prefect_agent +from prefect.agent.kubernetes import KubernetesAgent from prefect.utilities.configuration import set_temporary_config _agents = { "local": prefect_agent.local.LocalAgent, - "kubernetes": prefect_agent.kubernetes.KubernetesAgent, + "kubernetes": KubernetesAgent, "nomad": prefect_agent.nomad.NomadAgent, } From 280d561d167fd016351dd14b855fe57ad458c0d1 Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 16:57:41 -0400 Subject: [PATCH 11/24] Update src/prefect/agent/agent.py Co-Authored-By: Chris White --- src/prefect/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/agent/agent.py b/src/prefect/agent/agent.py index 5bc64923910d..f50c89ac484c 100644 --- a/src/prefect/agent/agent.py +++ b/src/prefect/agent/agent.py @@ -14,7 +14,7 @@ class Agent: """ Base class for Agents. - This Agent class is a standard point for interacting with Prefect Cloud. It is meant + This Agent class is a standard point for executing Flows in Prefect Cloud. It is meant to have subclasses which inherit functionality from this class. The only piece that the subclasses should implement is the `deploy_flows` function. It is built in this way to keep Prefect Cloud logic standard but allows for platform specific From c671b88baccd72ddef66b615ac591971613a6a23 Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 16:57:52 -0400 Subject: [PATCH 12/24] Update src/prefect/agent/agent.py Co-Authored-By: Chris White --- src/prefect/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/agent/agent.py b/src/prefect/agent/agent.py index f50c89ac484c..be71e06de932 100644 --- a/src/prefect/agent/agent.py +++ b/src/prefect/agent/agent.py @@ -16,7 +16,7 @@ class Agent: This Agent class is a standard point for executing Flows in Prefect Cloud. It is meant to have subclasses which inherit functionality from this class. The only piece that - the subclasses should implement is the `deploy_flows` function. It is built in this + the subclasses should implement is the `deploy_flows` function, which specifies how to run a Flow on the given platform. It is built in this way to keep Prefect Cloud logic standard but allows for platform specific customizability. From 5e8de828068d6e0265875e8a9d4df99079d8e726 Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 16:58:00 -0400 Subject: [PATCH 13/24] Update src/prefect/agent/agent.py Co-Authored-By: Chris White --- src/prefect/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/agent/agent.py b/src/prefect/agent/agent.py index be71e06de932..48fa7ed8a21a 100644 --- a/src/prefect/agent/agent.py +++ b/src/prefect/agent/agent.py @@ -21,7 +21,7 @@ class Agent: customizability. In order for this to operate `PREFECT__CLOUD__AGENT__AUTH_TOKEN` must be set as an - environment variable. + environment variable or in your user configuration file. """ def __init__(self) -> None: From 98a72ef84eed707d83b0d1feea911d8d485473fc Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 16:58:13 -0400 Subject: [PATCH 14/24] Update src/prefect/agent/kubernetes/README.md Co-Authored-By: Chris White --- src/prefect/agent/kubernetes/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/agent/kubernetes/README.md b/src/prefect/agent/kubernetes/README.md index bbe98e3bf769..88d83522ae6c 100644 --- a/src/prefect/agent/kubernetes/README.md +++ b/src/prefect/agent/kubernetes/README.md @@ -1,6 +1,6 @@ # k8s-agent -The Prefect Kubernetes agent that turns a cluster into a workflow orchestration system. +The Prefect Kubernetes agent that turns a cluster into a workflow execution platform, orchestrated by Prefect Cloud. If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default` From 88634a5b41105f1af38714660a537f7ac3129b93 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:00:45 -0400 Subject: [PATCH 15/24] Address minor comments from PR #1341 --- src/prefect/agent/local/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index 6ec809863b3e..df4b95241e1d 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -34,7 +34,7 @@ def __init__(self, base_url: str = None) -> None: def deploy_flows(self, flow_runs: list) -> None: """ - Deploy flow runs on to a k8s cluster as jobs + Deploy flow runs on your local machine as Docker containers Args: - flow_runs (list): A list of GraphQLResult flow run objects From aac10441c6b309edbfe6a9badc271814d2a5c134 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:02:53 -0400 Subject: [PATCH 16/24] Add note in k8s agent README about permissions --- src/prefect/agent/kubernetes/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/prefect/agent/kubernetes/README.md b/src/prefect/agent/kubernetes/README.md index 88d83522ae6c..2ddd920d4eca 100644 --- a/src/prefect/agent/kubernetes/README.md +++ b/src/prefect/agent/kubernetes/README.md @@ -4,6 +4,8 @@ The Prefect Kubernetes agent that turns a cluster into a workflow execution plat If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default` +The agent needs to be able to read, list, and create both pods and jobs. The resource manager aspect needs the same permissions with the added role of being able to delete jobs and pods. A more specific set of permissions will be added in a later PR. + Quick Start: - Build Dockerfile and push to registry From 57d6e1465236443a567d7a91bf4448979317cc67 Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 17:07:40 -0400 Subject: [PATCH 17/24] Update src/prefect/agent/kubernetes/README.md Co-Authored-By: Chris White --- src/prefect/agent/kubernetes/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prefect/agent/kubernetes/README.md b/src/prefect/agent/kubernetes/README.md index 2ddd920d4eca..5dce269b237d 100644 --- a/src/prefect/agent/kubernetes/README.md +++ b/src/prefect/agent/kubernetes/README.md @@ -4,7 +4,7 @@ The Prefect Kubernetes agent that turns a cluster into a workflow execution plat If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default` -The agent needs to be able to read, list, and create both pods and jobs. The resource manager aspect needs the same permissions with the added role of being able to delete jobs and pods. A more specific set of permissions will be added in a later PR. +The agent needs to be able to read, list, and create both pods and jobs. The resource manager aspect needs the same permissions with the added role of being able to delete jobs and pods. Quick Start: From 534bb0dde73e051801598dc9a44fb98c508d9c99 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:31:24 -0400 Subject: [PATCH 18/24] Adjust kubernetes import level for tests --- src/prefect/agent/__init__.py | 1 + src/prefect/agent/kubernetes/__init__.py | 9 +-- src/prefect/agent/kubernetes/agent.py | 9 +-- .../agent/kubernetes/resource_manager.py | 59 ++++++++++++------- src/prefect/cli/agent.py | 11 ++-- 5 files changed, 50 insertions(+), 39 deletions(-) diff --git a/src/prefect/agent/__init__.py b/src/prefect/agent/__init__.py index 1c59893468c2..043b2829f164 100644 --- a/src/prefect/agent/__init__.py +++ b/src/prefect/agent/__init__.py @@ -3,4 +3,5 @@ from prefect.agent.agent import Agent import prefect.agent.local +import prefect.agent.kubernetes import prefect.agent.nomad diff --git a/src/prefect/agent/kubernetes/__init__.py b/src/prefect/agent/kubernetes/__init__.py index 7e69517fba7b..d4877b5ac645 100644 --- a/src/prefect/agent/kubernetes/__init__.py +++ b/src/prefect/agent/kubernetes/__init__.py @@ -1,7 +1,2 @@ -try: - from prefect.agent.kubernetes.agent import KubernetesAgent - from prefect.agent.kubernetes.resource_manager import ResourceManager -except ImportError: - raise ImportError( - 'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.' - ) +from prefect.agent.kubernetes.agent import KubernetesAgent +from prefect.agent.kubernetes.resource_manager import ResourceManager diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py index fc8989370cd1..407f4b1be567 100644 --- a/src/prefect/agent/kubernetes/agent.py +++ b/src/prefect/agent/kubernetes/agent.py @@ -2,7 +2,6 @@ from os import path import uuid -from kubernetes import client, config import yaml from prefect.agent import Agent @@ -21,12 +20,16 @@ class KubernetesAgent(Agent): def __init__(self) -> None: super().__init__() + from kubernetes import client, config + try: config.load_incluster_config() except config.config_exception.ConfigException as exc: self.logger.warning(f"{exc} Using out of cluster configuration option.") config.load_kube_config() + self.batch_client = client.BatchV1Api() + def deploy_flows(self, flow_runs: list) -> None: """ Deploy flow runs on to a k8s cluster as jobs @@ -34,8 +37,6 @@ def deploy_flows(self, flow_runs: list) -> None: Args: - flow_runs (list): A list of GraphQLResult flow run objects """ - batch_client = client.BatchV1Api() - for flow_run in flow_runs: # Require Docker storage @@ -47,7 +48,7 @@ def deploy_flows(self, flow_runs: list) -> None: job_spec = self.replace_job_spec_yaml(flow_run) - batch_client.create_namespaced_job( + self.batch_client.create_namespaced_job( namespace=os.getenv("NAMESPACE", "default"), body=job_spec ) diff --git a/src/prefect/agent/kubernetes/resource_manager.py b/src/prefect/agent/kubernetes/resource_manager.py index dd7c64e22bc4..f654e3f94881 100644 --- a/src/prefect/agent/kubernetes/resource_manager.py +++ b/src/prefect/agent/kubernetes/resource_manager.py @@ -2,10 +2,15 @@ import os import time -from kubernetes import client, config import pendulum from prefect import Client +from prefect import config + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import kubernetes class ResourceManager: @@ -32,12 +37,16 @@ def __init__(self) -> None: self.logger = logger + from kubernetes import client, config + try: config.load_incluster_config() except config.config_exception.ConfigException as exc: self.logger.warning(f"{exc} Using out of cluster configuration option.") config.load_kube_config() + self.k8s_client = client + def start(self) -> None: """ Main loop which waits on a `LOOP_INTERVAL` and looks for finished jobs to clean @@ -56,11 +65,11 @@ def clean_resources(self) -> None: """ Find jobs that are either completed or failed to delete from the cluster """ - batch_client = client.BatchV1Api() + batch_client = self.k8s_client.BatchV1Api() try: jobs = batch_client.list_namespaced_job(namespace=self.namespace) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to list jobs in namespace {self.namespace}" ) @@ -89,11 +98,11 @@ def clean_extra_pods(self) -> None: Any runaway pods which failed due to unexpected reasons will be cleaned up here. ImagePullBackoffs, Evictions, etc... """ - core_client = client.CoreV1Api() + core_client = self.k8s_client.CoreV1Api() try: pods = core_client.list_namespaced_pod(namespace=self.namespace) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to list pods in namespace {self.namespace}" ) @@ -123,14 +132,16 @@ def delete_job(self, name: str) -> None: """ Delete a job based on the name """ - batch_client = client.BatchV1Api() + batch_client = self.k8s_client.BatchV1Api() self.logger.info(f"Deleting job {name} in namespace {self.namespace}") try: batch_client.delete_namespaced_job( - name=name, namespace=self.namespace, body=client.V1DeleteOptions() + name=name, + namespace=self.namespace, + body=self.k8s_client.V1DeleteOptions(), ) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to delete job {name} in namespace {self.namespace}" ) @@ -139,13 +150,13 @@ def delete_pods(self, job_name: str, identifier: str) -> None: """ Delete a pod based on the job name and identifier """ - core_client = client.CoreV1Api() + core_client = self.k8s_client.CoreV1Api() try: pods = core_client.list_namespaced_pod( namespace=self.namespace, label_selector="identifier={}".format(identifier), ) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to list pods in namespace {self.namespace}" ) @@ -160,9 +171,11 @@ def delete_pods(self, job_name: str, identifier: str) -> None: try: core_client.delete_namespaced_pod( - name=name, namespace=self.namespace, body=client.V1DeleteOptions() + name=name, + namespace=self.namespace, + body=self.k8s_client.V1DeleteOptions(), ) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to delete pod {name} in namespace {self.namespace}" ) @@ -171,14 +184,16 @@ def delete_extra_pod(self, name: str) -> None: """ Delete a pod based on the name """ - core_client = client.CoreV1Api() + core_client = self.k8s_client.CoreV1Api() self.logger.info(f"Deleting extra pod {name} in namespace {self.namespace}") try: core_client.delete_namespaced_pod( - name=name, namespace=self.namespace, body=client.V1DeleteOptions() + name=name, + namespace=self.namespace, + body=self.k8s_client.V1DeleteOptions(), ) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to delete pod {name} in namespace {self.namespace}" ) @@ -189,13 +204,13 @@ def report_failed_job(self, identifier: str) -> None: """ Report jobs that failed for reasons outside of a flow run """ - core_client = client.CoreV1Api() + core_client = self.k8s_client.CoreV1Api() try: pods = core_client.list_namespaced_pod( namespace=self.namespace, label_selector="identifier={}".format(identifier), ) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to list pods in namespace {self.namespace}" ) @@ -206,11 +221,11 @@ def report_failed_job(self, identifier: str) -> None: if phase == "Failed": self.report_failed_pod(pod) - def report_failed_pod(self, pod: client.V1Pod) -> None: + def report_failed_pod(self, pod: "kubernetes.client.V1Pod") -> None: """ Report pods that failed for reasons outside of a flow run. Write cloud log """ - core_client = client.CoreV1Api() + core_client = self.k8s_client.CoreV1Api() name = pod.metadata.name if pod.status.reason == "Evicted": @@ -220,7 +235,7 @@ def report_failed_pod(self, pod: client.V1Pod) -> None: logs = core_client.read_namespaced_pod_log( namespace=self.namespace, name=name ) - except client.rest.ApiException: + except self.k8s_client.rest.ApiException: self.logger.error( f"Error attempting to read pod logs for {name} in namespace {self.namespace}" ) @@ -238,7 +253,7 @@ def report_failed_pod(self, pod: client.V1Pod) -> None: info={}, ) - def report_unknown_pod(self, pod: client.V1Pod) -> None: + def report_unknown_pod(self, pod: "kubernetes.client.V1Pod") -> None: """ Write cloud log of pods that entered unknonw states """ @@ -255,7 +270,7 @@ def report_unknown_pod(self, pod: client.V1Pod) -> None: info={}, ) - def report_pod_image_pull_error(self, pod: client.V1Pod) -> None: + def report_pod_image_pull_error(self, pod: "kubernetes.client.V1Pod") -> None: """ Write cloud log of pods that ahd image pull errors """ diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index 8f37324e4de5..1e0f9f18c3a6 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -1,14 +1,13 @@ import click from prefect import config -from prefect import agent as prefect_agent -from prefect.agent.kubernetes import KubernetesAgent from prefect.utilities.configuration import set_temporary_config +from prefect.utilities.serialization import from_qualified_name _agents = { - "local": prefect_agent.local.LocalAgent, - "kubernetes": KubernetesAgent, - "nomad": prefect_agent.nomad.NomadAgent, + "local": "prefect.agent.local.LocalAgent", + "kubernetes": "prefect.agent.kubernetes.KubernetesAgent", + "nomad": "prefect.agent.nomad.NomadAgent", } @@ -62,4 +61,4 @@ def start(name, token): click.secho(f"{name} is not a valid agent", color="red") return - retrieved_agent().start() + from_qualified_name(retrieved_agent)().start() From 7904d8918e175fbba978523af04788642e3de885 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:34:11 -0400 Subject: [PATCH 19/24] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 187e8f65f20d..78b5c65a96f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/ ### Features -- None +- Added Local, Kubernetes, and Nomad agents - [#1341](https://github.com/PrefectHQ/prefect/pull/1341) ### Enhancements From 7c6b600d1515d90e4d03dc26848c147b12934454 Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:51:22 -0400 Subject: [PATCH 20/24] Remove f strings for 3.5 compatibility --- src/prefect/agent/kubernetes/agent.py | 6 +- .../agent/kubernetes/resource_manager.py | 64 +++++++++++++------ src/prefect/agent/local/agent.py | 2 +- src/prefect/agent/nomad/agent.py | 2 +- src/prefect/cli/agent.py | 2 +- 5 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/prefect/agent/kubernetes/agent.py b/src/prefect/agent/kubernetes/agent.py index 407f4b1be567..11a57cfb1058 100644 --- a/src/prefect/agent/kubernetes/agent.py +++ b/src/prefect/agent/kubernetes/agent.py @@ -25,7 +25,9 @@ def __init__(self) -> None: try: config.load_incluster_config() except config.config_exception.ConfigException as exc: - self.logger.warning(f"{exc} Using out of cluster configuration option.") + self.logger.warning( + "{} Using out of cluster configuration option.".format(exc) + ) config.load_kube_config() self.batch_client = client.BatchV1Api() @@ -42,7 +44,7 @@ def deploy_flows(self, flow_runs: list) -> None: # Require Docker storage if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): self.logger.error( - f"Storage for flow run {flow_run.id} is not of type Docker." + "Storage for flow run {} is not of type Docker.".format(flow_run.id) ) continue diff --git a/src/prefect/agent/kubernetes/resource_manager.py b/src/prefect/agent/kubernetes/resource_manager.py index f654e3f94881..9b383692a710 100644 --- a/src/prefect/agent/kubernetes/resource_manager.py +++ b/src/prefect/agent/kubernetes/resource_manager.py @@ -42,7 +42,9 @@ def __init__(self) -> None: try: config.load_incluster_config() except config.config_exception.ConfigException as exc: - self.logger.warning(f"{exc} Using out of cluster configuration option.") + self.logger.warning( + "{} Using out of cluster configuration option.".format(exc) + ) config.load_kube_config() self.k8s_client = client @@ -51,7 +53,7 @@ def start(self) -> None: """ Main loop which waits on a `LOOP_INTERVAL` and looks for finished jobs to clean """ - self.logger.info(f"Starting {type(self).__name__}") + self.logger.info("Starting {}".format(type(self).__name__)) while True: try: self.clean_resources() @@ -71,7 +73,7 @@ def clean_resources(self) -> None: jobs = batch_client.list_namespaced_job(namespace=self.namespace) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to list jobs in namespace {self.namespace}" + "Error attempting to list jobs in namespace {}".format(self.namespace) ) return @@ -83,7 +85,9 @@ def clean_resources(self) -> None: if job.status.failed: self.logger.info( - f"Found failed job {name} in namespace {self.namespace}" + "Found failed job {} in namespace {}".format( + name, self.namespace + ) ) self.report_failed_job(identifier=identifier) @@ -104,7 +108,7 @@ def clean_extra_pods(self) -> None: pods = core_client.list_namespaced_pod(namespace=self.namespace) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to list pods in namespace {self.namespace}" + "Error attempting to list pods in namespace {}".format(self.namespace) ) return @@ -133,7 +137,7 @@ def delete_job(self, name: str) -> None: Delete a job based on the name """ batch_client = self.k8s_client.BatchV1Api() - self.logger.info(f"Deleting job {name} in namespace {self.namespace}") + self.logger.info("Deleting job {} in namespace {}".format(name, self.namespace)) try: batch_client.delete_namespaced_job( @@ -143,7 +147,9 @@ def delete_job(self, name: str) -> None: ) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to delete job {name} in namespace {self.namespace}" + "Error attempting to delete job {} in namespace {}".format( + name, self.namespace + ) ) def delete_pods(self, job_name: str, identifier: str) -> None: @@ -158,13 +164,15 @@ def delete_pods(self, job_name: str, identifier: str) -> None: ) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to list pods in namespace {self.namespace}" + "Error attempting to list pods in namespace {}".format(self.namespace) ) return if pods: self.logger.info( - f"Deleting {len(pods.items)} pods for job {job_name} in namespace {self.namespace}" + "Deleting {} pods for job {} in namespace {}".format( + len(pods.items), job_name, self.namespace + ) ) for pod in pods.items: name = pod.metadata.name @@ -177,7 +185,9 @@ def delete_pods(self, job_name: str, identifier: str) -> None: ) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to delete pod {name} in namespace {self.namespace}" + "Error attempting to delete pod {} in namespace {}".format( + name, self.namespace + ) ) def delete_extra_pod(self, name: str) -> None: @@ -185,7 +195,9 @@ def delete_extra_pod(self, name: str) -> None: Delete a pod based on the name """ core_client = self.k8s_client.CoreV1Api() - self.logger.info(f"Deleting extra pod {name} in namespace {self.namespace}") + self.logger.info( + "Deleting extra pod {} in namespace {}".format(name, self.namespace) + ) try: core_client.delete_namespaced_pod( @@ -195,7 +207,9 @@ def delete_extra_pod(self, name: str) -> None: ) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to delete pod {name} in namespace {self.namespace}" + "Error attempting to delete pod {} in namespace {}".format( + name, self.namespace + ) ) # REPORTING @@ -212,7 +226,7 @@ def report_failed_job(self, identifier: str) -> None: ) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to list pods in namespace {self.namespace}" + "Error attempting to list pods in namespace {}".format(self.namespace) ) return @@ -237,11 +251,15 @@ def report_failed_pod(self, pod: "kubernetes.client.V1Pod") -> None: ) except self.k8s_client.rest.ApiException: self.logger.error( - f"Error attempting to read pod logs for {name} in namespace {self.namespace}" + "Error attempting to read pod logs for {} in namespace {}".format( + name, self.namespace + ) ) return - self.logger.info(f"Reporting failed pod {name} in namespace {self.namespace}") + self.logger.info( + "Reporting failed pod {} in namespace {}".format(name, self.namespace) + ) self.client.write_run_log( flow_run_id=pod.metadata.labels.get("flow_run_id"), @@ -258,14 +276,18 @@ def report_unknown_pod(self, pod: "kubernetes.client.V1Pod") -> None: Write cloud log of pods that entered unknonw states """ name = pod.metadata.name - self.logger.info(f"Reporting unknown pod {name} in namespace {self.namespace}") + self.logger.info( + "Reporting unknown pod {} in namespace {}".format(name, self.namespace) + ) self.client.write_run_log( flow_run_id=pod.metadata.labels.get("flow_run_id"), task_run_id="", timestamp=pendulum.now(), name="resource-manager", - message=f"Flow run pod {name} entered an unknown state in namespace {self.namespace}", + message="Flow run pod {} entered an unknown state in namespace {}".format( + name, self.namespace + ), level="ERROR", info={}, ) @@ -279,7 +301,9 @@ def report_pod_image_pull_error(self, pod: "kubernetes.client.V1Pod") -> None: if waiting and waiting.reason == "ImagePullBackoff": self.logger.info( - f"Reporting image pull error for pod {pod.metadata.name} in namespace {self.namespace}" + "Reporting image pull error for pod {} in namespace {}".format( + pod.metadata.name, self.namespace + ) ) self.client.write_run_log( @@ -287,7 +311,9 @@ def report_pod_image_pull_error(self, pod: "kubernetes.client.V1Pod") -> None: task_run_id="", timestamp=pendulum.now(), name="resource-manager", - message=f"Flow run image pull error for pod {pod.metadata.name} in namespace {self.namespace}", + message="Flow run image pull error for pod {} in namespace {}".format( + pod.metadata.name, self.namespace + ), level="ERROR", info={}, ) diff --git a/src/prefect/agent/local/agent.py b/src/prefect/agent/local/agent.py index df4b95241e1d..752eb8ea4f14 100644 --- a/src/prefect/agent/local/agent.py +++ b/src/prefect/agent/local/agent.py @@ -44,7 +44,7 @@ def deploy_flows(self, flow_runs: list) -> None: storage = StorageSchema().load(flow_run.flow.storage) if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): self.logger.error( - f"Storage for flow run {flow_run.id} is not of type Docker." + "Storage for flow run {} is not of type Docker.".format(flow_run.id) ) continue diff --git a/src/prefect/agent/nomad/agent.py b/src/prefect/agent/nomad/agent.py index bc866656ec8e..a2e1b787a851 100644 --- a/src/prefect/agent/nomad/agent.py +++ b/src/prefect/agent/nomad/agent.py @@ -31,7 +31,7 @@ def deploy_flows(self, flow_runs: list) -> None: if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker): self.logger.error( - f"Storage for flow run {flow_run.id} is not of type Docker." + "Storage for flow run {} is not of type Docker.".format(flow_run.id) ) continue diff --git a/src/prefect/cli/agent.py b/src/prefect/cli/agent.py index 1e0f9f18c3a6..6c8070ff80e8 100644 --- a/src/prefect/cli/agent.py +++ b/src/prefect/cli/agent.py @@ -58,7 +58,7 @@ def start(name, token): retrieved_agent = _agents.get(name, None) if not retrieved_agent: - click.secho(f"{name} is not a valid agent", color="red") + click.secho("{} is not a valid agent".format(name), color="red") return from_qualified_name(retrieved_agent)().start() From e62f8f6b65fedf550ef18d01aa077b1ff248e22a Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:55:22 -0400 Subject: [PATCH 21/24] Mock docker client in agent CLI tests --- tests/cli/test_agent.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/cli/test_agent.py b/tests/cli/test_agent.py index 794a28a88aac..826b4a4e8df9 100644 --- a/tests/cli/test_agent.py +++ b/tests/cli/test_agent.py @@ -23,6 +23,9 @@ def test_agent_start(monkeypatch): start = MagicMock() monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + docker_client = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client) + runner = CliRunner() result = runner.invoke(agent, ["start"]) assert result.exit_code == 0 @@ -32,6 +35,9 @@ def test_agent_start_token(monkeypatch): start = MagicMock() monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + docker_client = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client) + runner = CliRunner() result = runner.invoke(agent, ["start", "-t", "test"]) assert result.exit_code == 0 @@ -41,6 +47,9 @@ def test_agent_start_fails(monkeypatch): start = MagicMock() monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start) + docker_client = MagicMock() + monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client) + runner = CliRunner() result = runner.invoke(agent, ["start", "TEST"]) assert result.exit_code == 0 From 0a03db9606f8745f88e63267a7c788c44ceafbbc Mon Sep 17 00:00:00 2001 From: Josh Meek Date: Fri, 9 Aug 2019 17:59:00 -0400 Subject: [PATCH 22/24] Fix remaining f strings from agent --- src/prefect/agent/agent.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/prefect/agent/agent.py b/src/prefect/agent/agent.py index 48fa7ed8a21a..41fff40ab6f9 100644 --- a/src/prefect/agent/agent.py +++ b/src/prefect/agent/agent.py @@ -47,20 +47,22 @@ def start(self) -> None: The main entrypoint to the agent. This function loops and constantly polls for new flow runs to deploy """ - self.logger.info(f"Starting {type(self).__name__}") + self.logger.info("Starting {}".format(type(self).__name__)) tenant_id = self.query_tenant_id() while True: try: flow_runs = self.query_flow_runs(tenant_id=tenant_id) self.logger.info( - f"Found {len(flow_runs)} flow run(s) to submit for execution." + "Found {} flow run(s) to submit for execution.".format( + len(flow_runs) + ) ) self.update_states(flow_runs) self.deploy_flows(flow_runs) self.logger.info( - f"Submitted {len(flow_runs)} flow run(s) for execution." + "Submitted {} flow run(s) for execution.".format(len(flow_runs)) ) except Exception as exc: self.logger.error(exc) From 072955ff8e0afb933814f8ff15344e44840d034f Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 18:14:45 -0400 Subject: [PATCH 23/24] Update setup.cfg --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index af5bbf3ce6b5..41b73eee4a33 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,7 +52,7 @@ omit= *_version.py *_siginfo.py # temporary agent omit - src/prefect/agent/* + agent/* parallel = True [coverage:report] From f5b02f4266f2071aa98e1402be9dd8b58fcf91f0 Mon Sep 17 00:00:00 2001 From: Josh Meek <40716964+joshmeek@users.noreply.github.com> Date: Fri, 9 Aug 2019 18:39:52 -0400 Subject: [PATCH 24/24] Update setup.cfg Co-Authored-By: Chris White --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 41b73eee4a33..85bbf964079c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,7 +52,7 @@ omit= *_version.py *_siginfo.py # temporary agent omit - agent/* + */agent/* parallel = True [coverage:report]