/
kubernetes.py
177 lines (153 loc) · 6.62 KB
/
kubernetes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from typing import Union, Iterable
import yaml
from prefect.run_configs.base import RunConfig
from prefect.utilities.filesystems import parse_path
class KubernetesRun(RunConfig):
"""Configure a flow-run to run as a Kubernetes Job.
Kubernetes jobs are configured by filling in a job template at runtime. A
job template can be specified either as a path (to be read in at runtime)
or an in-memory object (which will be stored along with the flow in Prefect
Cloud/Server). By default the job template configured on the Agent is used.
Args:
- job_template_path (str, optional): Path to a job template to use. If
a local path (no file scheme, or a `file`/`local` scheme), the job
template will be loaded on initialization and stored on the
`KubernetesRun` object as the `job_template` field. Otherwise the
job template will be loaded at runtime on the agent. Supported
runtime file schemes include (`s3`, `gcs`, and `agent` (for paths
local to the runtime agent)).
- job_template (str or dict, optional): An in-memory job template to
use. Can be either a dict, or a YAML string.
- image (str, optional): The image to use
- env (dict, optional): Additional environment variables to set on the job
- cpu_limit (float or str, optional): The CPU limit to use for the job
- cpu_request (float or str, optional): The CPU request to use for the job
- memory_limit (str, optional): The memory limit to use for the job
- memory_request (str, optional): The memory request to use for the job
- service_account_name (str, optional): A service account name to use
for this job. If present, overrides any service account configured
on the agent or in the job template.
- image_pull_secrets (list, optional): A list of image pull secrets to
use for this job. If present, overrides any image pull secrets
configured on the agent or in the job template.
- labels (Iterable[str], optional): an iterable of labels to apply to this
run config. Labels are string identifiers used by Prefect Agents
for selecting valid flow runs when polling for work
- image_pull_policy (str, optional): The imagePullPolicy to use for the job.
https://kubernetes.io/docs/concepts/configuration/overview/#container-images
Examples:
Use the defaults set on the agent:
```python
flow.run_config = KubernetesRun()
```
Use a local job template, which is stored along with the Flow in Prefect
Cloud/Server:
```python
flow.run_config = KubernetesRun(
job_template_path="my_custom_template.yaml"
)
```
Use a job template stored in S3, but override the image and CPU limit:
```python
flow.run_config = KubernetesRun(
job_template_path="s3://example-bucket/my_custom_template.yaml",
image="example/my-custom-image:latest",
cpu_limit=2,
)
```
Use an image not tagged with :latest, and set the image pull policy to `Always`:
```python
flow.run_config = KubernetesRun(
image="example/my-custom-image:my-tag,
image_pull_policy="Always"
)
```
Use a custom `job_template` with a custom label (or any other necessary changes to
the default).
Note: you can use the default job template, found at
`/prefect/src/prefect/agent/kubernetes/job_template.yaml` in the repository,
as a base to build on. Once a `job_template` is specified, the default is no longer
used:
```python
flow.run_config = KubernetesRun(
image="example/my-custom-image:my-tag,
job_template={
"apiVersion": "batch/v1",
"kind": "Job",
"spec": {
"template": {
"metadata": {
"labels": {
"my-custom-label": "something"
}
},
"spec": {
"containers": [
{
"name": "flow"
}
]
}
}
}
}
)
```
"""
def __init__(
self,
*,
job_template_path: str = None,
job_template: Union[str, dict] = None,
image: str = None,
env: dict = None,
cpu_limit: Union[float, str] = None,
cpu_request: Union[float, str] = None,
memory_limit: str = None,
memory_request: str = None,
service_account_name: str = None,
image_pull_secrets: Iterable[str] = None,
labels: Iterable[str] = None,
image_pull_policy: str = None,
) -> None:
super().__init__(env=env, labels=labels)
if job_template_path is not None and job_template is not None:
raise ValueError(
"Cannot provide both `job_template_path` and `job_template`"
)
if job_template_path is not None:
parsed = parse_path(job_template_path)
if parsed.scheme == "file":
with open(parsed.path) as f:
job_template = yaml.safe_load(f)
job_template_path = None
elif job_template is not None:
# Normalize job templates to objects rather than str
if isinstance(job_template, str):
job_template = yaml.safe_load(job_template)
assert job_template is None or isinstance(job_template, dict) # mypy
if cpu_limit is not None:
cpu_limit = str(cpu_limit)
if cpu_request is not None:
cpu_request = str(cpu_request)
if image_pull_secrets is not None:
image_pull_secrets = list(image_pull_secrets)
image_pull_policies = {"Always", "IfNotPresent", "Never"}
if (
image_pull_policy is not None
and image_pull_policy not in image_pull_policies
):
raise ValueError(
f"Invalid image_pull_policy {image_pull_policy!r}. "
"Expected 'Always', 'IfNotPresent', or 'Never'"
)
self.job_template_path = job_template_path
self.job_template = job_template
self.image = image
self.cpu_limit = cpu_limit
self.cpu_request = cpu_request
self.memory_limit = memory_limit
self.memory_request = memory_request
self.service_account_name = service_account_name
self.image_pull_secrets = image_pull_secrets
self.image_pull_policy = image_pull_policy