-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils.py
412 lines (342 loc) · 13.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
import os
import time
from pathlib import Path
from typing import Optional, Union
import lightkube
from lightkube import ApiError, Client, KubeConfig
from lightkube.resources.apps_v1 import Deployment
from lightkube.resources.core_v1 import Namespace, Node, PersistentVolumeClaim, Pod, Service
from dss.config import (
DSS_NAMESPACE,
MLFLOW_DEPLOYMENT_NAME,
NOTEBOOK_LABEL,
NOTEBOOK_PVC_NAME,
DeploymentState,
)
from dss.logger import setup_logger
# Set up logger
logger = setup_logger("logs/dss.log")
# Resource types used for a DSS Notebook
NOTEBOOK_RESOURCES = (Service, Deployment)
# Name for the environment variable storing kubeconfig
KUBECONFIG_ENV_VAR = "DSS_KUBECONFIG"
KUBECONFIG_DEFAULT = Path.home() / ".dss/config"
class ImagePullBackOffError(Exception):
"""
Raised when the Notebook Pod is unable to pull the image.
"""
__module__ = None
def __init__(self, msg: str, *args):
super().__init__(str(msg), *args)
self.msg = str(msg)
def wait_for_deployment_ready(
client: Client,
namespace: str,
deployment_name: str,
timeout_seconds: Optional[int] = 180,
interval_seconds: int = 10,
) -> None:
"""
Waits for a Kubernetes deployment to be ready. Can wait indefinitely if timeout_seconds is None.
Args:
client (Client): The Kubernetes client.
namespace (str): The namespace of the deployment.
deployment_name (str): The name of the deployment.
timeout_seconds (Optional[int]): Timeout in seconds, or None for no timeout.
Defaults to 180.
interval_seconds (int): Interval between checks in seconds. Defaults to 10.
Raises:
ImagePullBackOffError: If there is an issue pulling the deployment image.
TimeoutError: If the timeout is reached before the deployment is ready.
"""
logger.info(
f"Waiting for deployment {deployment_name} in namespace {namespace} to be ready..."
)
start_time = time.time()
while True:
deployment: Deployment = client.get(Deployment, namespace=namespace, name=deployment_name)
if deployment.status and deployment.status.availableReplicas == deployment.spec.replicas:
logger.info(f"Deployment {deployment_name} in namespace {namespace} is ready")
break
try:
pods = list(
client.list(
Pod,
namespace=namespace,
labels={NOTEBOOK_LABEL: deployment_name},
)
)
except ApiError as e:
if e.response.status_code == 404:
pods = []
if pods:
reason = (
pods[0].status.containerStatuses[0].state.waiting.reason
if pods[0].status.containerStatuses
and pods[0].status.containerStatuses[0].state.waiting
else "Unknown"
)
if reason in ["ImagePullBackOff", "ErrImagePull"]:
raise ImagePullBackOffError(
f"Failed to create Deployment {deployment_name} with {reason}"
)
if timeout_seconds is not None and time.time() - start_time >= timeout_seconds:
raise TimeoutError(
f"Timeout waiting for deployment {deployment_name} in namespace {namespace} to be ready" # noqa E501
)
else:
time.sleep(interval_seconds)
logger.info(
f"Waiting for deployment {deployment_name} in namespace {namespace} to be ready..."
)
def get_kubeconfig_path(
env_var: str = KUBECONFIG_ENV_VAR,
default_kubeconfig_location: Union[Path, str] = KUBECONFIG_DEFAULT,
) -> Path:
"""
Returns the path to the kubeconfig used by DSS
This will return:
* the kubeconfig file at the path specified by the given environment variable, if set
* otherwise, the kubeconfig file at the default path given
Args:
env_var (str): The name of the environment variable to check for the kubeconfig path.
default_kubeconfig_location (Path or str): The default path to the kubeconfig file if not
specified by the environment variable.
Returns:
Path: the path to the kubeconfig file
"""
# use expanduser() to handle '~' in the path
return Path(os.environ.get(env_var, default_kubeconfig_location)).expanduser()
def get_kubeconfig(
env_var: str = KUBECONFIG_ENV_VAR,
default_kubeconfig_location: Union[Path, str] = KUBECONFIG_DEFAULT,
) -> lightkube.KubeConfig:
"""Returns the kubeconfig used by DSS as a lightkube.KubeConfig object or fails if it not found.
This will return:
* the kubeconfig file at the path specified by the given environment variable, if set
* otherwise, the kubeconfig file at the default path given
Args:
env_var (str): The name of the environment variable to check for the kubeconfig path.
default_kubeconfig_location (Path or str): The default path to the kubeconfig file if not
specified by the environment variable.
Returns:
lightkube.KubeConfig: the value of kubeconfig
"""
kubeconfig_path = get_kubeconfig_path(env_var, default_kubeconfig_location)
return KubeConfig.from_file(kubeconfig_path)
def save_kubeconfig(
kubeconfig: str,
env_var: str = KUBECONFIG_ENV_VAR,
default_kubeconfig_location: Union[Path, str] = KUBECONFIG_DEFAULT,
) -> None:
"""
Save the kubeconfig file to the specified location.
This will create the parent directory if it does not exist.
Args:
kubeconfig (str): The kubeconfig file contents.
env_var (str): The name of the environment variable to check for the kubeconfig path.
default_kubeconfig_location (str): The default path to the kubeconfig file if not
specified by the environment variable.
Returns:
None
"""
save_location = get_kubeconfig_path(env_var, default_kubeconfig_location)
logger.info(f"Storing provided kubeconfig to {save_location}")
# Create the parent directory, if it does not exist
save_location.parent.mkdir(exist_ok=True)
with open(save_location, "w") as f:
f.write(kubeconfig)
def get_lightkube_client() -> lightkube.Client:
"""Returns a lightkube client configured with the kubeconfig used by DSS."""
kubeconfig = get_kubeconfig()
lightkube_client = Client(config=kubeconfig)
return lightkube_client
def get_mlflow_tracking_uri() -> str:
"""Returns the MLflow tracking URI for the DSS deployment."""
return f"http://{MLFLOW_DEPLOYMENT_NAME}.{DSS_NAMESPACE}.svc.cluster.local:5000"
def get_service_url(name: str, namespace: str, lightkube_client: Client) -> str:
"""
Returns the URL of the service given the name of the service.
Assumes that the service is exposed on its first port.
Returns None if the service is not found or if an error occurs,
logging the issue as an error.
Args:
name (str): The name of the service.
namespace (str): The namespace where the service is located.
lightkube_client (Client): The Kubernetes client.
Returns:
str or None: The URL of the service if found and has at least one port, otherwise None.
"""
try:
service = lightkube_client.get(Service, namespace=namespace, name=name)
except ApiError as err:
logger.error(
f"Failed to get the URL of notebook {name} with error code {err.status.code}.\n"
"Check the debug logs for more details."
)
logger.debug(f"Failed to get the URL of notebook {name} with error: {err}")
return None
if not service.spec.ports:
logger.error(f"No ports defined for the service {name} in namespace {namespace}.")
return None
ip = service.spec.clusterIP
port = service.spec.ports[
0
].port # Consider parameterizing or handling multiple ports if needed
return f"http://{ip}:{port}"
def does_notebook_exist(name: str, namespace: str, lightkube_client: Client) -> bool:
"""
Returns True if a notebook server with the given name exists in the given namespace.
This function returns true if either a Service or Deployment of the standard naming convention
exists.
"""
for resource in NOTEBOOK_RESOURCES:
try:
lightkube_client.get(resource, namespace=namespace, name=name)
return True
except ApiError as e:
if e.response.status_code == 404:
# Request succeeded but resource does not exist. Continue searching
continue
else:
# Something went wrong
raise e
# No resources found
return False
def does_dss_pvc_exist(lightkube_client: Client) -> bool:
"""
Returns True if the Notebooks PVC created during `dss initialize` exists in the DSS namespace.
"""
try:
lightkube_client.get(
PersistentVolumeClaim, namespace=DSS_NAMESPACE, name=NOTEBOOK_PVC_NAME
)
return True
except ApiError as e:
if e.response.status_code == 404:
# PVC not found
return False
else:
# Something went wrong
raise e
def does_mlflow_deployment_exist(lightkube_client: Client) -> bool:
"""
Returns True if the `mlflow` Deployment created during `dss initialize`
exists in the DSS namespace.
"""
try:
lightkube_client.get(Deployment, namespace=DSS_NAMESPACE, name=MLFLOW_DEPLOYMENT_NAME)
return True
except ApiError as e:
if e.response.status_code == 404:
# Deployment not found
return False
else:
# Something went wrong
raise e
def get_labels_for_node(lightkube_client: Client) -> dict:
"""
Get the labels of the only node in the cluster.
Args:
lightkube_client (Client): The Kubernetes client.
Returns:
dict: A dictionary containing labels of the node matching the gpu_type.
"""
nodes = list(lightkube_client.list(Node))
if len(nodes) != 1:
raise ValueError("Expected exactly one node in the cluster")
return nodes[0].metadata.labels
def get_deployment_state(deployment: Deployment, lightkube_client: Client) -> DeploymentState:
"""
Determine the state of a Kubernetes deployment, which is constrained to 0 or 1 replicas.
Args:
deployment (Deployment): The deployment object.
lightkube_client (Client): The Kubernetes client.
Returns:
DeploymentState: The state of the deployment as an enumeration.
"""
# Extract replica counts and deletion timestamp from deployment
desired_replicas = deployment.spec.replicas if deployment.spec.replicas is not None else 0
current_replicas = deployment.status.replicas if deployment.status.replicas is not None else 0
deletion_timestamp = deployment.metadata.deletionTimestamp
# Check for deployment deletion
if deletion_timestamp:
return DeploymentState.REMOVING
# Check pod statuses for image pulling issues and container creation
pods = lightkube_client.list(
Pod, namespace=deployment.metadata.namespace, labels=deployment.spec.selector.matchLabels
)
for pod in pods:
container_statuses = (
pod.status.containerStatuses if pod.status.containerStatuses is not None else []
)
for container_status in container_statuses:
if container_status.state.waiting:
if container_status.state.waiting.reason in [
"ContainerCreating",
]:
return DeploymentState.DOWNLOADING
elif container_status.state.waiting.reason in [
"ImagePullBackOff",
"ErrImagePull",
]:
return DeploymentState.ERRIMAGE
# Determine the state based on replica counts
if desired_replicas == 0:
if current_replicas == 0:
return DeploymentState.STOPPED
else:
return DeploymentState.STOPPING
elif desired_replicas == 1:
if current_replicas == 0:
return DeploymentState.STARTING
else:
# Check if all replicas are available
available_replicas = (
deployment.status.availableReplicas
if deployment.status.availableReplicas is not None
else 0
)
if available_replicas < 1:
return DeploymentState.STARTING
return DeploymentState.ACTIVE
else:
DeploymentState.UNKNOWN
def does_namespace_exist(lightkube_client: Client, namespace: str) -> bool:
"""
Returns:
True if the given namespace exists and False if the request returns a 404.
Raises:
ApiError if the request raises an error that is not a 404.
"""
try:
lightkube_client.get(Namespace, name=namespace)
return True
except ApiError as err:
if err.response.status_code == 404:
# Namespace was not found
return False
else:
raise err
def wait_for_namespace_to_be_deleted(
lightkube_client: Client,
namespace: str,
interval_seconds: int = 10,
) -> None:
"""
Waits for a namespace to be deleted.
Args:
lightkube_client (Client): The Kubernetes client.
namespace (str): The namespace being deleted.
interval_seconds (int): Interval between checks in seconds. Defaults to 10.
Returns:
None
Raises:
ApiError if helper does_namespace_exist() raises an ApiError.
"""
while True:
logger.info(f"Waiting for namespace {namespace} to be deleted...")
if does_namespace_exist(lightkube_client, DSS_NAMESPACE):
time.sleep(interval_seconds)
else:
break