Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fargate Agent #1521

Merged
merged 9 commits into from
Sep 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

### Features

- None
- Added Fargate agent - [#1521](https://github.com/PrefectHQ/prefect/pull/1521)

### Enhancements

Expand Down
1 change: 1 addition & 0 deletions src/prefect/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# others must be explicitly imported so they can raise helpful errors if appropriate

from prefect.agent.agent import Agent
import prefect.agent.fargate
import prefect.agent.local
import prefect.agent.kubernetes
import prefect.agent.nomad
1 change: 1 addition & 0 deletions src/prefect/agent/fargate/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from prefect.agent.fargate.agent import FargateAgent
249 changes: 249 additions & 0 deletions src/prefect/agent/fargate/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import os

from prefect import config
from prefect.agent import Agent
from prefect.environments.storage import Docker
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult


class FargateAgent(Agent):
"""
Agent which deploys flow runs as tasks using Fargate. This agent can run anywhere as
long as the proper access configuration variables are set.

Args:
- aws_access_key_id (str, optional): AWS access key id for connecting the boto3
client. Defaults to the value set in the environment variable
`AWS_ACCESS_KEY_ID`.
- aws_secret_access_key (str, optional): AWS secret access key for connecting
the boto3 client. Defaults to the value set in the environment variable
`AWS_SECRET_ACCESS_KEY`.
- region_name (str, optional): AWS region name for connecting the boto3 client.
Defaults to the value set in the environment variable `REGION_NAME`.
- cluster (str, optional): The Fargate cluster to deploy tasks. Defaults to the
value set in the environment variable `CLUSTER`.
- subnets (list, optional): A list of AWS VPC subnets to use for the tasks that
are deployed on Fargate. Defaults to the subnets found which have
`MapPublicIpOnLaunch` disabled.
- security_groups (list, optional): A list of security groups to associate with
the deployed tasks. Defaults to the default security group of the VPC.
- repository_credentials (str, optional): An Amazon Resource Name (ARN) of the
secret containing the private repository credentials. Defaults to the value
set in the environment variable `REPOSITORY_CREDENTIALS`.
- assign_public_ip (str, optional): Whether the task's elastic network interface
receives a public IP address. Defaults to the value set in the environment
variable `ASSIGN_PUBLIC_IP` or `ENABLED` otherwise.
- task_cpu (str, optional): The number of cpu units reserved for the container.
Defaults to the value set in the environment variable `TASK_CPU` or `256`
otherwise.
- task_memory (str, optional): The hard limit (in MiB) of memory to present to
the container. Defaults to the value set in the environment variable
`TASK_MEMORY` or `512` otherwise.
"""

def __init__(
self,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
region_name: str = None,
cluster: str = None,
subnets: list = None,
security_groups: list = None,
repository_credentials: str = None,
assign_public_ip: str = None,
task_cpu: str = None,
task_memory: str = None,
) -> None:
super().__init__()

from boto3 import client as boto3_client

# Config used for boto3 client initialization
aws_access_key_id = aws_access_key_id or os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = aws_secret_access_key or os.getenv(
"AWS_SECRET_ACCESS_KEY"
)
region_name = region_name or os.getenv("REGION_NAME")

# Agent task config
self.cluster = cluster or os.getenv("CLUSTER", "default")
self.subnets = subnets or []
self.security_groups = security_groups or []
self.repository_credentials = repository_credentials or os.getenv(
"REPOSITORY_CREDENTIALS"
)
self.assign_public_ip = assign_public_ip or os.getenv(
"ASSIGN_PUBLIC_IP", "ENABLED"
)
self.task_cpu = task_cpu or os.getenv("TASK_CPU", "256")
self.task_memory = task_memory or os.getenv("TASK_MEMORY", "512")

# Client initialization
self.boto3_client = boto3_client(
"ecs",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
)

# Look for default subnets with `MapPublicIpOnLaunch` disabled
if not subnets:
ec2 = boto3_client(
"ec2",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
)
for subnet in ec2.describe_subnets()["Subnets"]:
if not subnet.get("MapPublicIpOnLaunch"):
self.subnets.append(subnet.get("SubnetId"))

def deploy_flows(self, flow_runs: list) -> None:
"""
Deploy flow runs to Fargate

Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
for flow_run in flow_runs:

# Require Docker storage
if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker):
self.logger.error(
"Storage for flow run {} is not of type Docker.".format(flow_run.id)
)
continue

# check if task definition exists
if not self._verify_task_definition_exists(flow_run):
self._create_task_definition(flow_run)

# run task
self._run_task(flow_run)

def _verify_task_definition_exists(self, flow_run: GraphQLResult) -> bool:
"""
Check if a task definition already exists for the flow

Args:
- flow_runs (list): A list of GraphQLResult flow run objects

Returns:
- bool: whether or not a preexisting task definition is found for this flow
"""
from botocore.exceptions import ClientError

try:
self.boto3_client.describe_task_definition(
taskDefinition="prefect-task-{}".format(
flow_run.flow.id[:8] # type: ignore
)
)
except ClientError:
return False

return True

def _create_task_definition(self, flow_run: GraphQLResult) -> None:
"""
Create a task definition for the flow that each flow run will use. This function
is only called when a flow is run for the first time.

Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
container_definitions = [
{
"name": "flow",
"image": StorageSchema()
.load(flow_run.flow.storage) # type: ignore
.name,
"command": ["/bin/sh", "-c", "prefect execute cloud-flow"],
"environment": [
{
"name": "PREFECT__CLOUD__API",
"value": config.cloud.api or "https://api.prefect.io",
},
{"name": "PREFECT__CLOUD__USE_LOCAL_SECRETS", "value": "false"},
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
{"name": "PREFECT__LOGGING__LEVEL", "value": "DEBUG"},
{
"name": "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS",
"value": "prefect.engine.cloud.CloudFlowRunner",
},
{
"name": "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS",
"value": "prefect.engine.cloud.CloudTaskRunner",
},
],
"essential": True,
}
]

# Assign repository credentials if they are specified
if self.repository_credentials:
container_definitions[0]["repositoryCredentials"] = {
"credentialsParameter": self.repository_credentials
}

# Register task definition
self.boto3_client.register_task_definition(
family="prefect-task-{}".format(flow_run.flow.id[:8]), # type: ignore
containerDefinitions=container_definitions,
requiresCompatibilities=["FARGATE"],
networkMode="awsvpc",
cpu=self.task_cpu,
memory=self.task_memory,
)

def _run_task(self, flow_run: GraphQLResult) -> None:
"""
Run a task using the flow run.

Args:
- flow_runs (list): A list of GraphQLResult flow run objects
"""
container_overrides = [
{
"name": "flow",
"environment": [
{
"name": "PREFECT__CLOUD__AUTH_TOKEN",
"value": config.cloud.agent.auth_token,
},
{
"name": "PREFECT__CONTEXT__FLOW_RUN_ID",
"value": flow_run.id, # type: ignore
},
],
}
]

network_configuration = {
"awsvpcConfiguration": {
"subnets": self.subnets,
"assignPublicIp": self.assign_public_ip,
}
}

# Asssign task security groups if they are specified
if self.security_groups:
network_configuration["awsvpcConfiguration"][
"securityGroups"
] = self.security_groups

# Run task
self.boto3_client.run_task(
cluster=self.cluster,
taskDefinition="prefect-task-{}".format(
flow_run.flow.id[:8] # type: ignore
),
overrides={"containerOverrides": container_overrides},
launchType="FARGATE",
networkConfiguration=network_configuration,
)


if __name__ == "__main__":
FargateAgent().start()
6 changes: 5 additions & 1 deletion src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from prefect.utilities.serialization import from_qualified_name

_agents = {
"fargate": "prefect.agent.fargate.FargateAgent",
"local": "prefect.agent.local.LocalAgent",
"kubernetes": "prefect.agent.kubernetes.KubernetesAgent",
"nomad": "prefect.agent.nomad.NomadAgent",
Expand Down Expand Up @@ -54,12 +55,15 @@ def start(name, token, no_pull, base_url):

\b
Arguments:
name TEXT The name of an agent to start (e.g. `local`, `kubernetes`, `nomad`)
name TEXT The name of an agent to start (e.g. `local`, `kubernetes`, `fargate`, `nomad`)
Defaults to `local`

\b
Options:
--token, -t TEXT A Prefect Cloud API token with RUNNER scope

\b
Local Agent Options:
--base-url, -b TEXT A Docker daemon host URL for a LocalAgent
--no-pull Pull images for a LocalAgent
Defaults to pulling if not provided
Expand Down
Loading