Skip to content

Commit

Permalink
Merge branch 'master' into always-install-prefect
Browse files Browse the repository at this point in the history
  • Loading branch information
wagoodman authored Nov 5, 2019
2 parents 940ad15 + 39131bb commit f0b7e1e
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Auto-label Flow environments when using Local storage - [#1696](https://github.com/PrefectHQ/prefect/pull/1696)
- Batch upload logs to Cloud in a background thread for improved performance - [#1691](https://github.com/PrefectHQ/prefect/pull/1691)
- Attempt to install prefect in any docker image (if it is not already installed) - [#1704](https://github.com/PrefectHQ/prefect/pull/1704)
- Include agent labels within each flow's configuration environment - [#1671](https://github.com/PrefectHQ/prefect/issues/1671)

### Task Library

Expand Down
4 changes: 4 additions & 0 deletions src/prefect/agent/fargate/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ def _create_task_definition(self, flow_run: GraphQLResult) -> None:
"name": "PREFECT__CLOUD__API",
"value": config.cloud.api or "https://api.prefect.io",
},
{
"name": "PREFECT__CLOUD__AGENT__LABELS",
"value": str(self.labels),
},
{"name": "PREFECT__CLOUD__USE_LOCAL_SECRETS", "value": "false"},
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
{"name": "PREFECT__LOGGING__LEVEL", "value": "DEBUG"},
Expand Down
1 change: 1 addition & 0 deletions src/prefect/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def replace_job_spec_yaml(self, flow_run: GraphQLResult) -> dict:
env[1]["value"] = config.cloud.agent.auth_token
env[2]["value"] = flow_run.id # type: ignore
env[3]["value"] = os.getenv("NAMESPACE", "default")
env[4]["value"] = str(self.labels)

# Use image pull secrets if provided
job["spec"]["template"]["spec"]["imagePullSecrets"][0]["name"] = os.getenv(
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/agent/kubernetes/job_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ spec:
value: PREFECT__CONTEXT__FLOW_RUN_ID
- name: PREFECT__CONTEXT__NAMESPACE
value: PREFECT__CONTEXT__NAMESPACE
- name: PREFECT__CLOUD__AGENT__LABELS
value: PREFECT__CLOUD__AGENT__LABELS
- name: PREFECT__CLOUD__USE_LOCAL_SECRETS
value: "false"
- name: PREFECT__LOGGING__LOG_TO_CLOUD
Expand Down
1 change: 1 addition & 0 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def populate_env_vars(self, flow_run: GraphQLResult) -> dict:
return {
"PREFECT__CLOUD__API": config.cloud.api,
"PREFECT__CLOUD__AUTH_TOKEN": config.cloud.agent.auth_token,
"PREFECT__CLOUD__AGENT__LABELS": str(self.labels),
"PREFECT__CONTEXT__FLOW_RUN_ID": flow_run.id, # type: ignore
"PREFECT__CLOUD__USE_LOCAL_SECRETS": "false",
"PREFECT__LOGGING__LOG_TO_CLOUD": "true",
Expand Down
108 changes: 108 additions & 0 deletions tests/agent/test_fargate_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,113 @@ def test_deploy_flows_register_task_definition_all_args(monkeypatch, runner_toke
"command": ["/bin/sh", "-c", "prefect execute cloud-flow"],
"environment": [
{"name": "PREFECT__CLOUD__API", "value": "https://api.prefect.io"},
{"name": "PREFECT__CLOUD__AGENT__LABELS", "value": "[]"},
{"name": "PREFECT__CLOUD__USE_LOCAL_SECRETS", "value": "false"},
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
{"name": "PREFECT__LOGGING__LEVEL", "value": "DEBUG"},
{
"name": "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS",
"value": "prefect.engine.cloud.CloudFlowRunner",
},
{
"name": "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS",
"value": "prefect.engine.cloud.CloudTaskRunner",
},
],
"essential": True,
}
]
assert boto3_client.register_task_definition.call_args[1][
"requiresCompatibilities"
] == ["FARGATE"]
assert boto3_client.register_task_definition.call_args[1]["networkMode"] == "awsvpc"
assert boto3_client.register_task_definition.call_args[1]["cpu"] == "1"
assert boto3_client.register_task_definition.call_args[1]["memory"] == "2"


def test_deploy_flows_includes_agent_labels_in_environment(monkeypatch, runner_token):
boto3_client = MagicMock()

boto3_client.describe_task_definition.side_effect = ClientError({}, None)
boto3_client.run_task.return_value = {}
boto3_client.register_task_definition.return_value = {}

monkeypatch.setattr("boto3.client", MagicMock(return_value=boto3_client))

kwarg_dict = {
"taskRoleArn": "test",
"executionRoleArn": "test",
"volumes": "test",
"placementConstraints": "test",
"cpu": "1",
"memory": "2",
"tags": "test",
"pidMode": "test",
"ipcMode": "test",
"proxyConfiguration": "test",
"inferenceAccelerators": "test",
"cluster": "cluster",
"count": "test",
"startedBy": "test",
"group": "test",
"placementStrategy": "test",
"platformVersion": "test",
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["subnet"],
"assignPublicIp": "DISABLED",
"securityGroups": ["security_group"],
}
},
"enableECSManagedTags": "test",
"propagateTags": "test",
}

agent = FargateAgent(
aws_access_key_id="id",
aws_secret_access_key="secret",
aws_session_token="token",
region_name="region",
labels=["aws", "staging"],
**kwarg_dict
)
agent.deploy_flows(
flow_runs=[
GraphQLResult(
{
"flow": GraphQLResult(
{
"storage": Docker(
registry_url="test", image_name="name", image_tag="tag"
).serialize(),
"id": "id",
}
),
"id": "id",
}
)
]
)

assert boto3_client.describe_task_definition.called
assert boto3_client.register_task_definition.called
assert (
boto3_client.register_task_definition.call_args[1]["family"]
== "prefect-task-id"
)
assert boto3_client.register_task_definition.call_args[1][
"containerDefinitions"
] == [
{
"name": "flow",
"image": "test/name:tag",
"command": ["/bin/sh", "-c", "prefect execute cloud-flow"],
"environment": [
{"name": "PREFECT__CLOUD__API", "value": "https://api.prefect.io"},
{
"name": "PREFECT__CLOUD__AGENT__LABELS",
"value": "['aws', 'staging']",
},
{"name": "PREFECT__CLOUD__USE_LOCAL_SECRETS", "value": "false"},
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
{"name": "PREFECT__LOGGING__LEVEL", "value": "DEBUG"},
Expand Down Expand Up @@ -683,6 +790,7 @@ def test_deploy_flows_register_task_definition_no_repo_credentials(
"command": ["/bin/sh", "-c", "prefect execute cloud-flow"],
"environment": [
{"name": "PREFECT__CLOUD__API", "value": "https://api.prefect.io"},
{"name": "PREFECT__CLOUD__AGENT__LABELS", "value": "[]"},
{"name": "PREFECT__CLOUD__USE_LOCAL_SECRETS", "value": "false"},
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
{"name": "PREFECT__LOGGING__LEVEL", "value": "DEBUG"},
Expand Down
25 changes: 25 additions & 0 deletions tests/agent/test_k8s_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,31 @@ def test_k8s_agent_replace_yaml_no_pull_secrets(monkeypatch, runner_token):
assert not job["spec"]["template"]["spec"]["imagePullSecrets"][0]["name"]


def test_k8s_agent_includes_agent_labels_in_job(monkeypatch, runner_token):
k8s_config = MagicMock()
monkeypatch.setattr("kubernetes.config", k8s_config)

flow_run = GraphQLResult(
{
"flow": GraphQLResult(
{
"storage": Docker(
registry_url="test", image_name="name", image_tag="tag"
).serialize(),
"id": "id",
}
),
"id": "id",
}
)

agent = KubernetesAgent(labels=["foo", "bar"])
job = agent.replace_job_spec_yaml(flow_run)
env = job["spec"]["template"]["spec"]["containers"][0]["env"]

assert env[4]["value"] == "['foo', 'bar']"


def test_k8s_agent_generate_deployment_yaml(monkeypatch, runner_token):
k8s_config = MagicMock()
monkeypatch.setattr("kubernetes.config", k8s_config)
Expand Down
25 changes: 25 additions & 0 deletions tests/agent/test_local_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,31 @@ def test_populate_env_vars(monkeypatch, runner_token):
expected_vars = {
"PREFECT__CLOUD__API": "api",
"PREFECT__CLOUD__AUTH_TOKEN": "token",
"PREFECT__CLOUD__AGENT__LABELS": "[]",
"PREFECT__CONTEXT__FLOW_RUN_ID": "id",
"PREFECT__CLOUD__USE_LOCAL_SECRETS": "false",
"PREFECT__LOGGING__LOG_TO_CLOUD": "true",
"PREFECT__LOGGING__LEVEL": "DEBUG",
"PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudFlowRunner",
"PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS": "prefect.engine.cloud.CloudTaskRunner",
}

assert env_vars == expected_vars


def test_populate_env_vars_includes_agent_labels(monkeypatch, runner_token):
api = MagicMock()
monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", api)

with set_temporary_config({"cloud.agent.auth_token": "token", "cloud.api": "api"}):
agent = LocalAgent(labels=["42", "marvin"])

env_vars = agent.populate_env_vars(GraphQLResult({"id": "id"}))

expected_vars = {
"PREFECT__CLOUD__API": "api",
"PREFECT__CLOUD__AGENT__LABELS": "['42', 'marvin']",
"PREFECT__CLOUD__AUTH_TOKEN": "token",
"PREFECT__CONTEXT__FLOW_RUN_ID": "id",
"PREFECT__CLOUD__USE_LOCAL_SECRETS": "false",
"PREFECT__LOGGING__LOG_TO_CLOUD": "true",
Expand Down

0 comments on commit f0b7e1e

Please sign in to comment.