-
Notifications
You must be signed in to change notification settings - Fork 35
/
client.go
349 lines (302 loc) · 12.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
// Copyright (c) Edgeless Systems GmbH.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
package helm
import (
"context"
"encoding/base64"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/edgelesssys/marblerun/util/k8sutil"
"github.com/gofrs/flock"
"gopkg.in/yaml.v3"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/chartutil"
"helm.sh/helm/v3/pkg/cli"
"helm.sh/helm/v3/pkg/getter"
"helm.sh/helm/v3/pkg/repo"
"helm.sh/helm/v3/pkg/strvals"
)
// Options contains the values to set in the helm chart.
type Options struct {
Hostname string
PCCSURL string
UseSecureCert string
AccessToken string
SGXResourceKey string
WebhookSettings []string
SimulationMode bool
CoordinatorRESTPort int
CoordinatorGRPCPort int
}
// Client provides functionality to install and uninstall Helm charts.
type Client struct {
namespace string
config *action.Configuration
settings *cli.EnvSettings
}
// New initializes a new helm client.
func New(namespace string) (*Client, error) {
settings := cli.New()
// settings.KubeConfig = kubeConfigPath
actionConfig := &action.Configuration{}
if err := actionConfig.Init(settings.RESTClientGetter(), namespace, os.Getenv("HELM_DRIVER"), nopLog); err != nil {
return nil, err
}
return &Client{
namespace: namespace,
config: actionConfig,
settings: settings,
}, nil
}
// GetChart loads the helm chart from the given path or from the edgeless helm repo.
// This will add the edgeless helm repo if it is not already present on disk.
func (c *Client) GetChart(chartPath, version string) (*chart.Chart, error) {
if chartPath == "" {
// No chart was specified -> add or update edgeless helm repo
installer := action.NewInstall(c.config)
installer.ChartPathOptions.Version = version
err := c.getRepo(repoName, repoURI)
if err != nil {
return nil, fmt.Errorf("adding edgeless helm repo: %w", err)
}
chartPath, err = installer.ChartPathOptions.LocateChart(chartName, c.settings)
if err != nil {
return nil, fmt.Errorf("locating chart: %w", err)
}
}
chart, err := loader.Load(chartPath)
if err != nil {
return nil, fmt.Errorf("loading chart from path %q: %w", chartPath, err)
}
return chart, nil
}
// UpdateValues merges the provided options with the default values of the chart.
func UpdateValues(options Options, chartValues map[string]interface{}) (map[string]interface{}, error) {
stringValues := []string{}
stringValues = append(stringValues, fmt.Sprintf("coordinator.meshServerPort=%d", options.CoordinatorGRPCPort))
stringValues = append(stringValues, fmt.Sprintf("coordinator.clientServerPort=%d", options.CoordinatorRESTPort))
if options.SimulationMode {
// simulation mode, disable tolerations and resources, set simulation to true
stringValues = append(stringValues,
fmt.Sprintf("tolerations=%s", "null"),
fmt.Sprintf("coordinator.simulation=%t", options.SimulationMode),
fmt.Sprintf("coordinator.resources.limits=%s", "null"),
fmt.Sprintf("coordinator.hostname=%s", options.Hostname),
fmt.Sprintf("dcap=%s", "null"),
)
} else {
stringValues = append(stringValues,
fmt.Sprintf("coordinator.hostname=%s", options.Hostname),
fmt.Sprintf("dcap.pccsUrl=%s", options.PCCSURL),
fmt.Sprintf("dcap.useSecureCert=%s", options.UseSecureCert),
)
// Helms value merge function will overwrite any preset values for "tolerations" if we set new ones here
// To avoid this we set the new toleration for "resourceKey" and copy all preset tolerations
needToleration := true
idx := 0
for _, toleration := range chartValues["tolerations"].([]interface{}) {
if key, ok := toleration.(map[string]interface{})["key"]; ok {
if key == options.SGXResourceKey {
needToleration = false
}
stringValues = append(stringValues, fmt.Sprintf("tolerations[%d].key=%v", idx, key))
}
if operator, ok := toleration.(map[string]interface{})["operator"]; ok {
stringValues = append(stringValues, fmt.Sprintf("tolerations[%d].operator=%v", idx, operator))
}
if effect, ok := toleration.(map[string]interface{})["effect"]; ok {
stringValues = append(stringValues, fmt.Sprintf("tolerations[%d].effect=%v", idx, effect))
}
if value, ok := toleration.(map[string]interface{})["value"]; ok {
stringValues = append(stringValues, fmt.Sprintf("tolerations[%d].value=%v", idx, value))
}
if tolerationSeconds, ok := toleration.(map[string]interface{})["tolerationSeconds"]; ok {
stringValues = append(stringValues, fmt.Sprintf("tolerations[%d].tolerationSeconds=%v", idx, tolerationSeconds))
}
idx++
}
if needToleration {
stringValues = append(stringValues,
fmt.Sprintf("tolerations[%d].key=%s", idx, options.SGXResourceKey),
fmt.Sprintf("tolerations[%d].operator=Exists", idx),
fmt.Sprintf("tolerations[%d].effect=NoSchedule", idx),
)
}
}
// Configure enterprise access token
if options.AccessToken != "" {
coordinatorCfg, ok := chartValues["coordinator"].(map[string]interface{})
if !ok {
return nil, errors.New("coordinator not found in chart values")
}
repository, ok := coordinatorCfg["repository"].(string)
if !ok {
return nil, errors.New("coordinator.registry not found in chart values")
}
token := options.AccessToken
// If the token is a PAT, we need to encode it as base64(user:password)
if strings.HasPrefix(token, "ghp_") {
token = base64.StdEncoding.EncodeToString([]byte(token + ":" + token))
}
pullSecret := fmt.Sprintf(`{"auths":{"%s":{"auth":"%s"}}}`, repository, token)
stringValues = append(stringValues, fmt.Sprintf("pullSecret.token=%s", base64.StdEncoding.EncodeToString([]byte(pullSecret))))
}
if len(options.WebhookSettings) > 0 {
stringValues = append(stringValues, options.WebhookSettings...)
stringValues = append(stringValues, fmt.Sprintf("marbleInjector.resourceKey=%s", options.SGXResourceKey))
}
finalValues := map[string]interface{}{}
for _, val := range stringValues {
if err := strvals.ParseInto(val, finalValues); err != nil {
return nil, fmt.Errorf("parsing value %q into final values: %w", val, err)
}
}
if !options.SimulationMode {
setSGXValues(options.SGXResourceKey, finalValues, chartValues)
}
return finalValues, nil
}
// Install installs MarbleRun using the provided chart and values.
func (c *Client) Install(ctx context.Context, wait bool, chart *chart.Chart, values map[string]interface{}) error {
installer := action.NewInstall(c.config)
installer.Namespace = c.namespace
installer.ReleaseName = release
installer.CreateNamespace = true
installer.Wait = wait
installer.Timeout = time.Minute * 5
if err := chartutil.ValidateAgainstSchema(chart, values); err != nil {
return err
}
_, err := installer.RunWithContext(ctx, chart, values)
return err
}
// Uninstall removes the MarbleRun deployment from the cluster.
func (c *Client) Uninstall(wait bool) error {
uninstaller := action.NewUninstall(c.config)
uninstaller.Wait = wait
uninstaller.Timeout = time.Minute * 5
_, err := uninstaller.Run(release)
return err
}
// getRepo is a simplified repo_add from helm cli to add MarbleRun repo if it does not yet exist.
// To make sure we use the newest chart we always download the needed index file.
func (c *Client) getRepo(name string, url string) error {
repoFile := c.settings.RepositoryConfig
// Ensure the file directory exists as it is required for file locking
err := os.MkdirAll(filepath.Dir(repoFile), 0o755)
if err != nil && !os.IsExist(err) {
return fmt.Errorf("creating helm repository directory: %w", err)
}
// Acquire a file lock for process synchronization
fileLock := flock.New(strings.Replace(repoFile, filepath.Ext(repoFile), ".lock", 1))
lockCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
locked, err := fileLock.TryLockContext(lockCtx, time.Second)
if err == nil && locked {
defer func() { _ = fileLock.Unlock() }()
}
if err != nil {
return fmt.Errorf("acquiring helm repository lock: %w", err)
}
b, err := os.ReadFile(repoFile)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("reading helm repository file: %w", err)
}
var f repo.File
if err := yaml.Unmarshal(b, &f); err != nil {
return err
}
entry := &repo.Entry{
Name: name,
URL: url,
}
r, err := repo.NewChartRepository(entry, getter.All(c.settings))
if err != nil {
return fmt.Errorf("creating helm repository: %w", err)
}
if _, err := r.DownloadIndexFile(); err != nil {
return fmt.Errorf("downloading helm repository index file: %w", err)
}
f.Update(entry)
if err := f.WriteFile(repoFile, 0o644); err != nil {
return fmt.Errorf("writing helm repository file: %w", err)
}
return nil
}
// setSGXValues sets the needed values for the coordinator as a map[string]interface.
// strvals can't parse keys which include dots, e.g. setting as a resource limit key "sgx.intel.com/epc" will lead to errors.
func setSGXValues(resourceKey string, values, chartValues map[string]interface{}) {
values["coordinator"].(map[string]interface{})["resources"] = map[string]interface{}{
"limits": map[string]interface{}{},
"requests": map[string]interface{}{},
}
var needNewLimit bool
limit := k8sutil.GetEPCResourceLimit(resourceKey)
// remove all previously set sgx resource limits
if presetLimits, ok := chartValues["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{}); ok {
for oldResourceKey := range presetLimits {
// Make sure the key we delete is an unwanted sgx resource and not a custom resource or common resource (cpu, memory, etc.)
if needsDeletion(oldResourceKey, resourceKey) {
values["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{})[oldResourceKey] = nil
needNewLimit = true
}
}
}
// remove all previously set sgx resource requests
if presetLimits, ok := chartValues["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{}); ok {
for oldResourceKey := range presetLimits {
if needsDeletion(oldResourceKey, resourceKey) {
values["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{})[oldResourceKey] = nil
needNewLimit = true
}
}
}
// Set the new sgx resource limit, kubernetes will automatically set a resource request equal to the limit
if needNewLimit {
values["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{})[resourceKey] = limit
}
// Make sure provision and enclave bit is set if the Intel plugin is used
if resourceKey == k8sutil.IntelEpc.String() {
values["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{})[k8sutil.IntelProvision.String()] = 1
values["coordinator"].(map[string]interface{})["resources"].(map[string]interface{})["limits"].(map[string]interface{})[k8sutil.IntelEnclave.String()] = 1
}
}
// needsDeletion checks if an existing key of a helm chart should be deleted.
// Choice is based on the resource key of the used SGX device plugin.
func needsDeletion(existingKey, sgxKey string) bool {
sgxResources := []string{
k8sutil.AlibabaEpc.String(), k8sutil.AzureEpc.String(), k8sutil.IntelEpc.String(),
k8sutil.IntelProvision.String(), k8sutil.IntelEnclave.String(),
}
switch sgxKey {
case k8sutil.AlibabaEpc.String(), k8sutil.AzureEpc.String():
// Delete all non Alibaba/Azure SGX resources depending on the used SGX device plugin
return sgxKey != existingKey && keyInList(existingKey, sgxResources)
case k8sutil.IntelEpc.String():
// Delete all non Intel SGX resources depending on the used SGX device plugin
// Keep Intel provision and enclave bit
return keyInList(existingKey, []string{k8sutil.AlibabaEpc.String(), k8sutil.AzureEpc.String()})
default:
// Either no SGX plugin or a custom SGX plugin is used
// Delete all known SGX resources
return keyInList(existingKey, sgxResources)
}
}
func keyInList(key string, list []string) bool {
for _, l := range list {
if key == l {
return true
}
}
return false
}
func nopLog(_ string, _ ...interface{}) {}