-
Notifications
You must be signed in to change notification settings - Fork 6
/
charm.py
executable file
·240 lines (210 loc) · 9.78 KB
/
charm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
# See LICENSE file for licensing details.
#
import logging
from charmed_kubeflow_chisme.exceptions import ErrorWithStatus, GenericCharmRuntimeError
from charmed_kubeflow_chisme.kubernetes import KubernetesResourceHandler
from charmed_kubeflow_chisme.lightkube.batch import delete_many
from charms.grafana_k8s.v0.grafana_dashboard import GrafanaDashboardProvider
from charms.kubeflow_dashboard.v0.kubeflow_dashboard_links import (
DashboardLink,
KubeflowDashboardLinksRequirer,
)
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
from lightkube import ApiError
from lightkube.generic_resource import load_in_cluster_generic_resources
from ops.charm import CharmBase
from ops.main import main
from ops.model import ActiveStatus, MaintenanceStatus, WaitingStatus
K8S_RESOURCE_FILES = [
"src/templates/rbac_manifests.yaml.j2",
"src/templates/secret.yaml.j2",
"src/templates/deployment.yaml.j2",
"src/templates/validatingwebhookconfiguration.yaml.j2",
"src/templates/service.yaml.j2",
]
CRD_RESOURCE_FILES = [
"src/templates/crds_manifests.yaml.j2",
]
METRICS_PATH = "/metrics"
METRICS_PORT = "8080"
WEBHOOK_PORT = "443"
WEBHOOK_TARGET_PORT = "9443"
logger = logging.getLogger(__name__)
class TrainingOperatorCharm(CharmBase):
"""A Juju Charm for Training Operator"""
def __init__(self, *args):
super().__init__(*args)
self.logger = logging.getLogger(__name__)
self._image = self.config["training-operator-image"]
self._name = self.model.app.name
self._namespace = self.model.name
self._lightkube_field_manager = "lightkube"
self._context = {
"namespace": self._namespace,
"app_name": self._name,
"training_operator_image": self._image,
"metrics_port": METRICS_PORT,
"webhook_port": WEBHOOK_PORT,
"webhook_target_port": WEBHOOK_TARGET_PORT,
}
self._k8s_resource_handler = None
self._crd_resource_handler = None
self.dashboard_provider = GrafanaDashboardProvider(self)
self.framework.observe(self.on.upgrade_charm, self._on_upgrade)
self.framework.observe(self.on.config_changed, self._on_event)
self.framework.observe(self.on.leader_elected, self._on_event)
self.framework.observe(self.on.install, self._on_install)
self.framework.observe(self.on.remove, self._on_remove)
# Add documentation link to the dashboard
self.kubeflow_dashboard_sidebar = KubeflowDashboardLinksRequirer(
charm=self,
relation_name="dashboard-links",
dashboard_links=[
DashboardLink(
text="Kubeflow Training Operator Documentation",
link="https://www.kubeflow.org/docs/components/training/",
desc="Documentation for Kubeflow Training Operator",
location="documentation",
),
],
)
# The target is the Service (applied with service.yaml.j2) and the name has the following
# format: app-name-workload.namespace.svc:metrics_port
self.prometheus_provider = MetricsEndpointProvider(
charm=self,
relation_name="metrics-endpoint",
jobs=[
{
"metrics_path": METRICS_PATH,
"static_configs": [
{
"targets": [
f"{self._name}-workload.{self._namespace}.svc:{METRICS_PORT}"
]
}
],
}
],
)
@property
def k8s_resource_handler(self):
"""Update K8S with K8S resources."""
if not self._k8s_resource_handler:
self._k8s_resource_handler = KubernetesResourceHandler(
field_manager=self._lightkube_field_manager,
template_files=K8S_RESOURCE_FILES,
context=self._context,
logger=self.logger,
)
load_in_cluster_generic_resources(self._k8s_resource_handler.lightkube_client)
return self._k8s_resource_handler
@k8s_resource_handler.setter
def k8s_resource_handler(self, handler: KubernetesResourceHandler):
self._k8s_resource_handler = handler
@property
def crd_resource_handler(self):
"""Update K8S with CRD resources."""
if not self._crd_resource_handler:
self._crd_resource_handler = KubernetesResourceHandler(
field_manager=self._lightkube_field_manager,
template_files=CRD_RESOURCE_FILES,
context=self._context,
logger=self.logger,
)
load_in_cluster_generic_resources(self._crd_resource_handler.lightkube_client)
return self._crd_resource_handler
@crd_resource_handler.setter
def crd_resource_handler(self, handler: KubernetesResourceHandler):
self._crd_resource_handler = handler
def _check_leader(self):
"""Check if this unit is a leader."""
if not self.unit.is_leader():
self.logger.info("Not a leader, skipping setup")
raise ErrorWithStatus("Waiting for leadership", WaitingStatus)
def _check_and_report_k8s_conflict(self, error):
"""Returns True if error status code is 409 (conflict), False otherwise."""
if error.status.code == 409:
self.logger.warning(f"Encountered a conflict: {str(error)}")
return True
return False
def _apply_k8s_resources(self, force_conflicts: bool = False) -> None:
"""Applies K8S resources.
Args:
force_conflicts (bool): *(optional)* Will "force" apply requests causing conflicting
fields to change ownership to the field manager used in this
charm.
NOTE: This will only be used if initial regular apply() fails.
"""
self.unit.status = MaintenanceStatus("Creating K8S resources")
try:
self.crd_resource_handler.apply()
except ApiError as error:
if self._check_and_report_k8s_conflict(error) and force_conflicts:
# conflict detected when applying CRD resources
# re-apply CRD resources with forced conflict resolution
self.unit.status = MaintenanceStatus("Force applying CRD resources")
self.logger.warning("Applying CRD resources with conflict resolution")
self.crd_resource_handler.apply(force=force_conflicts)
else:
raise GenericCharmRuntimeError("CRD resources creation failed") from error
try:
self.k8s_resource_handler.apply()
except ApiError as error:
if self._check_and_report_k8s_conflict(error) and force_conflicts:
# conflict detected when applying K8S resources
# re-apply K8S resources with forced conflict resolution
self.unit.status = MaintenanceStatus("Force applying K8S resources")
self.logger.warning("Applying K8S resources with conflict resolution")
self.k8s_resource_handler.apply(force=force_conflicts)
else:
raise GenericCharmRuntimeError("K8S resources creation failed") from error
self.model.unit.status = MaintenanceStatus("K8S resources created")
# TODO: force_conflicts=True due to
# https://github.com/canonical/training-operator/issues/104
# Remove this if [this pr](https://github.com/canonical/charmed-kubeflow-chisme/pull/65)
# merges.
def _on_event(self, _, force_conflicts: bool = True) -> None:
"""Perform all required actions the Charm.
Args:
force_conflicts (bool): Should only be used when need to resolved conflicts on K8S
resources.
"""
try:
self._check_leader()
self._apply_k8s_resources(force_conflicts=force_conflicts)
except ErrorWithStatus as error:
self.model.unit.status = error.status
return
self.model.unit.status = ActiveStatus()
def _on_install(self, _):
"""Perform installation only actions."""
# apply K8S resources to speed up deployment
# TODO: force_conflicts=True due to
# https://github.com/canonical/training-operator/issues/104
# Remove this if [this pr](https://github.com/canonical/charmed-kubeflow-chisme/pull/65)
# merges.
self._apply_k8s_resources(force_conflicts=True)
def _on_upgrade(self, _):
"""Perform upgrade steps."""
# force conflict resolution in K8S resources update
# TODO: Remove force_conflicts if
# [this pr](https://github.com/canonical/charmed-kubeflow-chisme/pull/65) merges.
self._on_event(_, force_conflicts=True)
def _on_remove(self, _):
"""Remove all resources."""
self.unit.status = MaintenanceStatus("Removing K8S resources")
k8s_resources_manifests = self.k8s_resource_handler.render_manifests()
crd_resources_manifests = self.crd_resource_handler.render_manifests()
try:
delete_many(self.crd_resource_handler.lightkube_client, crd_resources_manifests)
delete_many(self.k8s_resource_handler.lightkube_client, k8s_resources_manifests)
except ApiError as error:
# do not log/report when resources were not found
if error.status.code != 404:
self.logger.error(f"Failed to delete K8S resources, with error: {error}")
raise error
self.unit.status = MaintenanceStatus("K8S resources removed")
if __name__ == "__main__":
main(TrainingOperatorCharm)