Skip to content

Commit

Permalink
Merge pull request #1341 from PrefectHQ/agents
Browse files Browse the repository at this point in the history
Hello Agent
  • Loading branch information
joshmeek committed Aug 9, 2019
2 parents e7e4253 + f5b02f4 commit 6f10159
Show file tree
Hide file tree
Showing 24 changed files with 1,141 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Features

- None
- Added Local, Kubernetes, and Nomad agents - [#1341](https://github.com/PrefectHQ/prefect/pull/1341)

### Enhancements

Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ omit=
**__init__.py
*_version.py
*_siginfo.py
# temporary agent omit
*/agent/*
parallel = True

[coverage:report]
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import prefect.serialization

import prefect.agent

from ._version import get_versions

__version__ = get_versions()["version"] # type: ignore
Expand Down
7 changes: 7 additions & 0 deletions src/prefect/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# only agents that don't require `extras` should be automatically imported here;
# others must be explicitly imported so they can raise helpful errors if appropriate

from prefect.agent.agent import Agent
import prefect.agent.local
import prefect.agent.kubernetes
import prefect.agent.nomad
199 changes: 199 additions & 0 deletions src/prefect/agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import logging
import os
import pendulum
import time

from prefect import config
from prefect.client import Client
from prefect.serialization import state
from prefect.engine.state import Submitted
from prefect.utilities.graphql import with_args


class Agent:
"""
Base class for Agents.
This Agent class is a standard point for executing Flows in Prefect Cloud. It is meant
to have subclasses which inherit functionality from this class. The only piece that
the subclasses should implement is the `deploy_flows` function, which specifies how to run a Flow on the given platform. It is built in this
way to keep Prefect Cloud logic standard but allows for platform specific
customizability.
In order for this to operate `PREFECT__CLOUD__AGENT__AUTH_TOKEN` must be set as an
environment variable or in your user configuration file.
"""

def __init__(self) -> None:
# self.auth_token = config.cloud.agent.get("auth_token")
self.loop_interval = config.cloud.agent.get("loop_interval")

self.client = Client(token=config.cloud.agent.get("auth_token"))

logger = logging.getLogger("agent")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
ch.setFormatter(formatter)
logger.addHandler(ch)

self.logger = logger

def start(self) -> None:
"""
The main entrypoint to the agent. This function loops and constantly polls for
new flow runs to deploy
"""
self.logger.info("Starting {}".format(type(self).__name__))
tenant_id = self.query_tenant_id()

while True:
try:
flow_runs = self.query_flow_runs(tenant_id=tenant_id)
self.logger.info(
"Found {} flow run(s) to submit for execution.".format(
len(flow_runs)
)
)

self.update_states(flow_runs)
self.deploy_flows(flow_runs)
self.logger.info(
"Submitted {} flow run(s) for execution.".format(len(flow_runs))
)
except Exception as exc:
self.logger.error(exc)
time.sleep(self.loop_interval)

def query_tenant_id(self) -> str:
"""
Query Prefect Cloud for the tenant id that corresponds to the agent's auth token
Returns:
- str: The current tenant id
"""
query = {"query": {"tenant": {"id"}}}
result = self.client.graphql(query)
return result.data.tenant[0].id # type: ignore

def query_flow_runs(self, tenant_id: str) -> list:
"""
Query Prefect Cloud for flow runs which need to be deployed and executed
Args:
- tenant_id (str): The tenant id to use in the query
Returns:
- list: A list of GraphQLResult flow run objects
"""

# Get scheduled flow runs from queue
mutation = {
"mutation($input: getRunsInQueueInput!)": {
"getRunsInQueue(input: $input)": {"flow_run_ids"}
}
}

result = self.client.graphql(
mutation, variables={"input": {"tenantId": tenant_id}}
)
flow_run_ids = result.data.getRunsInQueue.flow_run_ids # type: ignore
now = pendulum.now("UTC")

# Query metadata fow flow runs found in queue
query = {
"query": {
with_args(
"flow_run",
{
# match flow runs in the flow_run_ids list
"where": {
"id": {"_in": flow_run_ids},
"_or": [
# who are EITHER scheduled...
{"state": {"_eq": "Scheduled"}},
# OR running with task runs scheduled to start more than 3 seconds ago
{
"state": {"_eq": "Running"},
"task_runs": {
"state_start_time": {
"_lte": str(now.subtract(seconds=3))
}
},
},
],
}
},
): {
"id": True,
"version": True,
"tenant_id": True,
"state": True,
"serialized_state": True,
"parameters": True,
"flow": {"id", "name", "environment", "storage"},
with_args(
"task_runs",
{
"where": {
"state_start_time": {
"_lte": str(now.subtract(seconds=3))
}
}
},
): {"id", "version", "task_id", "serialized_state"},
}
}
}

result = self.client.graphql(query)
return result.data.flow_run # type: ignore

def update_states(self, flow_runs: list) -> None:
"""
After a flow run is grabbed this function sets the state to Submitted so it
won't be picked up by any other processes
Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
for flow_run in flow_runs:

# Set flow run state to `Submitted` if it is currently `Scheduled`
if state.StateSchema().load(flow_run.serialized_state).is_scheduled():
self.client.set_flow_run_state(
flow_run_id=flow_run.id,
version=flow_run.version,
state=Submitted(
message="Submitted for execution",
state=state.StateSchema().load(flow_run.serialized_state),
),
)

# Set task run states to `Submitted` if they are currently `Scheduled`
for task_run in flow_run.task_runs:
if state.StateSchema().load(task_run.serialized_state).is_scheduled():
self.client.set_task_run_state(
task_run_id=task_run.id,
version=task_run.version,
state=Submitted(
message="Submitted for execution",
state=state.StateSchema().load(task_run.serialized_state),
),
)

def deploy_flows(self, flow_runs: list) -> None:
"""
Meant to be overridden by a platform specific deployment option
Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
pass


if __name__ == "__main__":
Agent().start()
13 changes: 13 additions & 0 deletions src/prefect/agent/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# k8s-agent

The Prefect Kubernetes agent that turns a cluster into a workflow execution platform, orchestrated by Prefect Cloud.

If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default`

The agent needs to be able to read, list, and create both pods and jobs. The resource manager aspect needs the same permissions with the added role of being able to delete jobs and pods.

Quick Start:

- Build Dockerfile and push to registry
- Update the `image` and `PREFECT__CLOUD__AUTH_TOKEN` env value in the deployment yaml
- Run `kubectl apply -f deployment.yaml`
2 changes: 2 additions & 0 deletions src/prefect/agent/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from prefect.agent.kubernetes.agent import KubernetesAgent
from prefect.agent.kubernetes.resource_manager import ResourceManager
99 changes: 99 additions & 0 deletions src/prefect/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
from os import path
import uuid

import yaml

from prefect.agent import Agent
from prefect.environments.storage import Docker
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult


class KubernetesAgent(Agent):
"""
Agent which deploys flow runs as Kubernetes jobs. Currently this is required to either
run on a k8s cluster or on a local machine where the kube_config is pointing at the
desired cluster.
"""

def __init__(self) -> None:
super().__init__()

from kubernetes import client, config

try:
config.load_incluster_config()
except config.config_exception.ConfigException as exc:
self.logger.warning(
"{} Using out of cluster configuration option.".format(exc)
)
config.load_kube_config()

self.batch_client = client.BatchV1Api()

def deploy_flows(self, flow_runs: list) -> None:
"""
Deploy flow runs on to a k8s cluster as jobs
Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
for flow_run in flow_runs:

# Require Docker storage
if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker):
self.logger.error(
"Storage for flow run {} is not of type Docker.".format(flow_run.id)
)
continue

job_spec = self.replace_job_spec_yaml(flow_run)

self.batch_client.create_namespaced_job(
namespace=os.getenv("NAMESPACE", "default"), body=job_spec
)

def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict:
"""
Populate metadata and variables in the job_spec.yaml file for flow runs
Args:
- flow_run (GraphQLResult): A flow run object
Returns:
- dict: a dictionary representing the populated yaml object
"""
with open(path.join(path.dirname(__file__), "job_spec.yaml"), "r") as job_file:
job = yaml.safe_load(job_file)

identifier = str(uuid.uuid4())[:8]
job_name = "prefect-job-{}".format(identifier)

# Populate job metadata for identification
job["metadata"]["name"] = job_name
job["metadata"]["labels"]["app"] = job_name
job["metadata"]["labels"]["identifier"] = identifier
job["metadata"]["labels"]["flow_run_id"] = flow_run.id # type: ignore
job["metadata"]["labels"]["flow_id"] = flow_run.flow.id # type: ignore
job["spec"]["template"]["metadata"]["labels"]["app"] = job_name
job["spec"]["template"]["metadata"]["labels"]["identifier"] = identifier

# Use flow storage image for job
job["spec"]["template"]["spec"]["containers"][0]["image"] = (
StorageSchema().load(flow_run.flow.storage).name # type: ignore
)

# Populate environment variables for flow run execution
env = job["spec"]["template"]["spec"]["containers"][0]["env"]

env[0]["value"] = os.getenv("PREFECT__CLOUD__API", "https://api.prefect.io")
env[1]["value"] = os.environ["PREFECT__CLOUD__AGENT__AUTH_TOKEN"]
env[2]["value"] = flow_run.id # type: ignore
env[3]["value"] = os.getenv("NAMESPACE", "default")

return job


if __name__ == "__main__":
KubernetesAgent().start()
Loading

0 comments on commit 6f10159

Please sign in to comment.