Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hello Agent #1341

Merged
merged 28 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2676fb6
Added base agent class and updated config/client settings
joshmeek Aug 8, 2019
b9677ac
Added Kubernetes agent
joshmeek Aug 8, 2019
adcb3ed
Add Nomad agent
joshmeek Aug 9, 2019
ba877d7
Working local agent
joshmeek Aug 9, 2019
4477336
Local agent base URL parameterization and doc cleanup
joshmeek Aug 9, 2019
8bca40d
Clean up k8s and nomad agents for mypy checks
joshmeek Aug 9, 2019
fc72aa1
Add agent start CLI command`
joshmeek Aug 9, 2019
5afed32
Added tests for agent start CLI command
joshmeek Aug 9, 2019
3578982
Omit agent coverage check temporarily
joshmeek Aug 9, 2019
e8e7116
Merge branch 'master' into agents
joshmeek Aug 9, 2019
15f1fcc
Move KubernetesAgent import level due to package extras requirement
joshmeek Aug 9, 2019
a500201
Merge branch 'agents' of https://github.com/PrefectHQ/prefect into ag…
joshmeek Aug 9, 2019
280d561
Update src/prefect/agent/agent.py
joshmeek Aug 9, 2019
c671b88
Update src/prefect/agent/agent.py
joshmeek Aug 9, 2019
5e8de82
Update src/prefect/agent/agent.py
joshmeek Aug 9, 2019
98a72ef
Update src/prefect/agent/kubernetes/README.md
joshmeek Aug 9, 2019
88634a5
Address minor comments from PR #1341
joshmeek Aug 9, 2019
aac1044
Add note in k8s agent README about permissions
joshmeek Aug 9, 2019
57d6e14
Update src/prefect/agent/kubernetes/README.md
joshmeek Aug 9, 2019
ee375f8
Merge branch 'master' into agents
joshmeek Aug 9, 2019
534bb0d
Adjust kubernetes import level for tests
joshmeek Aug 9, 2019
43e225a
Merge branch 'agents' of https://github.com/PrefectHQ/prefect into ag…
joshmeek Aug 9, 2019
7904d89
Update CHANGELOG
joshmeek Aug 9, 2019
7c6b600
Remove f strings for 3.5 compatibility
joshmeek Aug 9, 2019
e62f8f6
Mock docker client in agent CLI tests
joshmeek Aug 9, 2019
0a03db9
Fix remaining f strings from agent
joshmeek Aug 9, 2019
072955f
Update setup.cfg
joshmeek Aug 9, 2019
f5b02f4
Update setup.cfg
joshmeek Aug 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ omit=
**__init__.py
*_version.py
*_siginfo.py
# temporary agent omit
src/prefect/agent/*
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
parallel = True

[coverage:report]
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import prefect.serialization

import prefect.agent

from ._version import get_versions

__version__ = get_versions()["version"] # type: ignore
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# only agents that don't require `extras` should be automatically imported here;
# others must be explicitly imported so they can raise helpful errors if appropriate

from prefect.agent.agent import Agent
import prefect.agent.local
import prefect.agent.nomad
197 changes: 197 additions & 0 deletions src/prefect/agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import logging
import os
import pendulum
import time

from prefect import config
from prefect.client import Client
from prefect.serialization import state
from prefect.engine.state import Submitted
from prefect.utilities.graphql import with_args


class Agent:
"""
Base class for Agents.

This Agent class is a standard point for executing Flows in Prefect Cloud. It is meant
to have subclasses which inherit functionality from this class. The only piece that
the subclasses should implement is the `deploy_flows` function, which specifies how to run a Flow on the given platform. It is built in this
way to keep Prefect Cloud logic standard but allows for platform specific
customizability.

In order for this to operate `PREFECT__CLOUD__AGENT__AUTH_TOKEN` must be set as an
environment variable or in your user configuration file.
"""

def __init__(self) -> None:
# self.auth_token = config.cloud.agent.get("auth_token")
self.loop_interval = config.cloud.agent.get("loop_interval")

self.client = Client(token=config.cloud.agent.get("auth_token"))

logger = logging.getLogger("agent")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
ch.setFormatter(formatter)
logger.addHandler(ch)

self.logger = logger

def start(self) -> None:
"""
The main entrypoint to the agent. This function loops and constantly polls for
new flow runs to deploy
"""
self.logger.info(f"Starting {type(self).__name__}")
tenant_id = self.query_tenant_id()

while True:
try:
flow_runs = self.query_flow_runs(tenant_id=tenant_id)
self.logger.info(
f"Found {len(flow_runs)} flow run(s) to submit for execution."
)

self.update_states(flow_runs)
self.deploy_flows(flow_runs)
self.logger.info(
f"Submitted {len(flow_runs)} flow run(s) for execution."
)
except Exception as exc:
self.logger.error(exc)
time.sleep(self.loop_interval)

def query_tenant_id(self) -> str:
"""
Query Prefect Cloud for the tenant id that corresponds to the agent's auth token

Returns:
- str: The current tenant id
"""
query = {"query": {"tenant": {"id"}}}
result = self.client.graphql(query)
return result.data.tenant[0].id # type: ignore

def query_flow_runs(self, tenant_id: str) -> list:
"""
Query Prefect Cloud for flow runs which need to be deployed and executed

Args:
- tenant_id (str): The tenant id to use in the query

Returns:
- list: A list of GraphQLResult flow run objects
"""

# Get scheduled flow runs from queue
mutation = {
"mutation($input: getRunsInQueueInput!)": {
"getRunsInQueue(input: $input)": {"flow_run_ids"}
}
}

result = self.client.graphql(
mutation, variables={"input": {"tenantId": tenant_id}}
)
flow_run_ids = result.data.getRunsInQueue.flow_run_ids # type: ignore
now = pendulum.now("UTC")

# Query metadata fow flow runs found in queue
query = {
"query": {
with_args(
"flow_run",
{
# match flow runs in the flow_run_ids list
"where": {
"id": {"_in": flow_run_ids},
"_or": [
# who are EITHER scheduled...
{"state": {"_eq": "Scheduled"}},
# OR running with task runs scheduled to start more than 3 seconds ago
{
"state": {"_eq": "Running"},
"task_runs": {
"state_start_time": {
"_lte": str(now.subtract(seconds=3))
}
},
},
],
}
},
): {
"id": True,
"version": True,
"tenant_id": True,
"state": True,
"serialized_state": True,
"parameters": True,
"flow": {"id", "name", "environment", "storage"},
with_args(
"task_runs",
{
"where": {
"state_start_time": {
"_lte": str(now.subtract(seconds=3))
}
}
},
): {"id", "version", "task_id", "serialized_state"},
}
}
}

result = self.client.graphql(query)
return result.data.flow_run # type: ignore

def update_states(self, flow_runs: list) -> None:
"""
After a flow run is grabbed this function sets the state to Submitted so it
won't be picked up by any other processes

Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
for flow_run in flow_runs:

# Set flow run state to `Submitted` if it is currently `Scheduled`
if state.StateSchema().load(flow_run.serialized_state).is_scheduled():
self.client.set_flow_run_state(
flow_run_id=flow_run.id,
version=flow_run.version,
state=Submitted(
message="Submitted for execution",
state=state.StateSchema().load(flow_run.serialized_state),
),
)

# Set task run states to `Submitted` if they are currently `Scheduled`
for task_run in flow_run.task_runs:
if state.StateSchema().load(task_run.serialized_state).is_scheduled():
self.client.set_task_run_state(
task_run_id=task_run.id,
version=task_run.version,
state=Submitted(
message="Submitted for execution",
state=state.StateSchema().load(task_run.serialized_state),
),
)

def deploy_flows(self, flow_runs: list) -> None:
"""
Meant to be overridden by a platform specific deployment option

Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
pass


if __name__ == "__main__":
Agent().start()
13 changes: 13 additions & 0 deletions src/prefect/agent/kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# k8s-agent

The Prefect Kubernetes agent that turns a cluster into a workflow execution platform, orchestrated by Prefect Cloud.

If running on GKE you may need to execute: `kubectl create clusterrolebinding default-admin --clusterrole cluster-admin --serviceaccount=default:default`
joshmeek marked this conversation as resolved.
Show resolved Hide resolved

The agent needs to be able to read, list, and create both pods and jobs. The resource manager aspect needs the same permissions with the added role of being able to delete jobs and pods. A more specific set of permissions will be added in a later PR.
joshmeek marked this conversation as resolved.
Show resolved Hide resolved

Quick Start:

- Build Dockerfile and push to registry
- Update the `image` and `PREFECT__CLOUD__AUTH_TOKEN` env value in the deployment yaml
- Run `kubectl apply -f deployment.yaml`
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 7 additions & 0 deletions src/prefect/agent/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
try:
from prefect.agent.kubernetes.agent import KubernetesAgent
from prefect.agent.kubernetes.resource_manager import ResourceManager
except ImportError:
raise ImportError(
'Using `prefect.agent.kubernetes` requires Prefect to be installed with the "kubernetes" extra.'
)
96 changes: 96 additions & 0 deletions src/prefect/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
from os import path
import uuid

from kubernetes import client, config
import yaml

from prefect.agent import Agent
from prefect.environments.storage import Docker
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult


class KubernetesAgent(Agent):
"""
Agent which deploys flow runs as Kubernetes jobs. Currently this is required to either
run on a k8s cluster or on a local machine where the kube_config is pointing at the
desired cluster.
"""

def __init__(self) -> None:
super().__init__()

try:
config.load_incluster_config()
except config.config_exception.ConfigException as exc:
self.logger.warning(f"{exc} Using out of cluster configuration option.")
config.load_kube_config()

def deploy_flows(self, flow_runs: list) -> None:
"""
Deploy flow runs on to a k8s cluster as jobs

Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
batch_client = client.BatchV1Api()

for flow_run in flow_runs:

# Require Docker storage
if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker):
self.logger.error(
f"Storage for flow run {flow_run.id} is not of type Docker."
)
continue

job_spec = self.replace_job_spec_yaml(flow_run)

batch_client.create_namespaced_job(
namespace=os.getenv("NAMESPACE", "default"), body=job_spec
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
)

def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict:
"""
Populate metadata and variables in the job_spec.yaml file for flow runs

Args:
- flow_run (GraphQLResult): A flow run object

Returns:
- dict: a dictionary representing the populated yaml object
"""
with open(path.join(path.dirname(__file__), "job_spec.yaml"), "r") as job_file:
job = yaml.safe_load(job_file)

identifier = str(uuid.uuid4())[:8]
job_name = "prefect-job-{}".format(identifier)

# Populate job metadata for identification
job["metadata"]["name"] = job_name
job["metadata"]["labels"]["app"] = job_name
job["metadata"]["labels"]["identifier"] = identifier
job["metadata"]["labels"]["flow_run_id"] = flow_run.id # type: ignore
job["metadata"]["labels"]["flow_id"] = flow_run.flow.id # type: ignore
job["spec"]["template"]["metadata"]["labels"]["app"] = job_name
job["spec"]["template"]["metadata"]["labels"]["identifier"] = identifier

# Use flow storage image for job
job["spec"]["template"]["spec"]["containers"][0]["image"] = (
StorageSchema().load(flow_run.flow.storage).name # type: ignore
)

# Populate environment variables for flow run execution
env = job["spec"]["template"]["spec"]["containers"][0]["env"]

env[0]["value"] = os.getenv("PREFECT__CLOUD__API", "https://api.prefect.io")
env[1]["value"] = os.environ["PREFECT__CLOUD__AGENT__AUTH_TOKEN"]
env[2]["value"] = flow_run.id # type: ignore
env[3]["value"] = os.getenv("NAMESPACE", "default")

return job


if __name__ == "__main__":
KubernetesAgent().start()
55 changes: 55 additions & 0 deletions src/prefect/agent/kubernetes/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: prefect-agent
labels:
app: prefect-agent
spec:
replicas: 1
selector:
matchLabels:
app: prefect-agent
template:
metadata:
labels:
app: prefect-agent
spec:
containers:
- name: agent
image: prefecthq/prefect:latest
imagePullPolicy: IfNotPresent
command: ["/bin/bash", "-c"]
args: ["prefect agent start kubernetes-agent"] # COMMAND TBD
env:
- name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
value: ""
- name: PREFECT__CLOUD__API
value: "https://api.prefect.io"
- name: PREFECT__CLOUD__AGENT__LOOP_INTERVAL
value: "5"
- name: NAMESPACE
value: "default"
resources:
limits:
memory: "128Mi"
cpu: "100m"
- name: resource-manager
image: prefecthq/prefect:latest
imagePullPolicy: IfNotPresent
command: ["/bin/bash", "-c"]
args: ["prefect agent start kubernetes-resource-manager"] # COMMAND TBD
env:
- name: PREFECT__CLOUD__AGENT__AUTH_TOKEN
value: ""
- name: PREFECT__CLOUD__API
value: "https://api.prefect.io"
- name: PREFECT__CLOUD__AGENT__RESOURCE_MANAGER__LOOP_INTERVAL
value: "60"
- name: NAMESPACE
value: "default"
resources:
limits:
memory: "128Mi"
cpu: "100m"
imagePullSecrets:
- name: ""
Loading