generated from broadinstitute/golang-project-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
kubecfg.go
400 lines (349 loc) · 14.1 KB
/
kubecfg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
package kubecfg
import (
"context"
"encoding/base64"
"fmt"
"github.com/pkg/errors"
"path"
"sort"
"sync"
"time"
container "cloud.google.com/go/container/apiv1"
"cloud.google.com/go/container/apiv1/containerpb"
"github.com/broadinstitute/thelma/internal/thelma/state/api/terra"
"github.com/broadinstitute/thelma/internal/thelma/utils"
"github.com/broadinstitute/thelma/internal/thelma/utils/flock"
"github.com/rs/zerolog/log"
"golang.org/x/oauth2"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
// defaultAuthInfo name of AuthInfo inside kube config file to use for authentication to non-prod clusters
const defaultAuthInfo = "default"
// ctxDelimiter delimiter to use when joining environment/cluster/chart names to build context names in kubecfg
// note that this helps guarantee uniqueness since underscores are not valid characters in env or cluster names
const ctxDelimiter = "_"
// kubectx represents a run context for a kubectl command
type kubectx struct {
contextName string // contextName name of the run context in the kubecfg file
namespace string // namespace to run command in
cluster terra.Cluster // cluster the cluster this command should be executed against
}
// Kubectx represents a run context for a kubectl command
type Kubectx interface {
// ContextName name of the run context in the kubecfg file
ContextName() string
// Namespace kubectl command should run against
Namespace() string
}
func (k kubectx) ContextName() string {
return k.contextName
}
func (k kubectx) Namespace() string {
return k.namespace
}
// ReleaseKtx is a convenience type that bundles a terra.Release with its associated Kubectx
type ReleaseKtx struct {
Release terra.Release
Kubectx Kubectx
}
// Kubeconfig manages entries in a `kubectl` config file (traditionally ~/.kube/config) for Terra environments & releases.
// It creates context entries for environments and releases.
// It works like `gcloud container clusters get-credentials`, except:
// - Users don't have to specify a project or location (because thelma already knows where clusters live)
// - Context entries are named in a more user-friendly fashion. For example, the context for the "alpha" environment
// is called `alpha`, instead of `gke_broad-dsde-alpha_us-central1-a_terra-alpha`. This makes it possible to quickly run
// a kubectl command against the alpha environment using `kubectl -c alpha` (no need to specify a namespace).
//
// Read more about kubectl contexts here:
// https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/#define-clusters-users-and-contexts
//
// Q: Why not just shell out to gcloud?
// Because `gcloud` is a big ol' Python app. Thelma is designed to run on developer laptops and depending on users
// having the correct version of gcloud and Python installed is more brittle than just generating GKE credentials ourselves.
type Kubeconfig interface {
// ConfigFile path to the .kubecfg file where context entries are generated
ConfigFile() string
// ForRelease returns the name of the kubectx to use for executing commands against this release
ForRelease(terra.Release) (Kubectx, error)
// ForReleases returns a kubectx for each given release
ForReleases(releases ...terra.Release) ([]ReleaseKtx, error)
// ForEnvironment returns all kubectxs for all releases in an environment
ForEnvironment(env terra.Environment) ([]Kubectx, error)
// ForCluster returns the name of the kubectx to use for executing commands against the cluster (without any awareness of environment/releases)
ForCluster(cluster terra.Cluster) (Kubectx, error)
}
// New constructs a Kubeconfig
func New(file string, gkeClient *container.ClusterManagerClient, tokenSource oauth2.TokenSource) Kubeconfig {
lockfile := path.Join(path.Dir(file), "."+path.Base(file)+".lk")
return &kubeconfig{
file: file,
gkeClient: gkeClient,
tokenSource: tokenSource,
locker: flock.NewLocker(lockfile, func(options *flock.Options) {
options.Timeout = 30 * time.Second
options.RetryInterval = 100 * time.Millisecond
}),
writtenCtxs: make(map[string]struct{}),
}
}
type kubeconfig struct {
file string // path to kubeconfig file we should write auth creds to (eg. ~/.thelma/config/kubeconfig)
gkeClient *container.ClusterManagerClient // google container engine / kubernetes engine client
tokenSource oauth2.TokenSource // token source to be used when adding auth token to kubeconfig file
locker flock.Locker // file lock for preventing concurrent kubeconfig updates from stomping on each other
writtenCtxs map[string]struct{} // cache for previously-written kubectxs
mutex sync.Mutex // mutex for safe read/writing to writtenContexts
}
func (c *kubeconfig) ForRelease(release terra.Release) (Kubectx, error) {
return c.addRelease(release)
}
func (c *kubeconfig) ForReleases(releases ...terra.Release) ([]ReleaseKtx, error) {
var result []ReleaseKtx
for _, r := range releases {
ktx, err := c.ForRelease(r)
if err != nil {
return nil, err
}
result = append(result, ReleaseKtx{
Release: r,
Kubectx: ktx,
})
}
return result, nil
}
func (c *kubeconfig) ForEnvironment(env terra.Environment) ([]Kubectx, error) {
return c.addAllReleases(env)
}
func (c *kubeconfig) ForCluster(cluster terra.Cluster) (Kubectx, error) {
return c.addCluster(cluster)
}
func (c *kubeconfig) ConfigFile() string {
return c.file
}
// addAllReleases updates the kubecfg to include a context for all releases in an environment.
//
// This means adding the environment's default context (see addEnvironmentDefault), and, for any
// release outside the environment's default cluster, a separate context keyed by the releases
// Argo application name, which is globally unique.
//
// For example, when called for Terra's alpha environment, addAllReleases will add a default context
// called "alpha" as well as a context called "datarepo-alpha", which points at DataRepo's alpha cluster
// and the "terra-alpha" namespace.
func (c *kubeconfig) addAllReleases(env terra.Environment) ([]Kubectx, error) {
var kubectxts []Kubectx
// add environment's default context
defaultCtx, err := c.addEnvironmentDefault(env)
if err != nil {
return nil, err
}
kubectxts = append(kubectxts, defaultCtx)
// add context for any releases that aren't in the environment's default cluster
for _, release := range env.Releases() {
if release.Cluster().Name() != env.DefaultCluster().Name() {
_kubectx, err := c.addRelease(release)
if err != nil {
return nil, err
}
kubectxts = append(kubectxts, _kubectx)
}
}
// sort by context name for predictability and easy testing
sort.Slice(kubectxts, func(i, j int) bool {
return kubectxts[i].ContextName() < kubectxts[j].ContextName()
})
return kubectxts, nil
}
// addEnvironmentDefault updates the kubecfg to include a context for the environment, pointing at the environment's
// namespace and default cluster. For example, if called for Terra's alpha environment, this will add a context to
// the kubeconfig that is called "alpha", points at the "terra-alpha" namespace, and is configured to point at the
// terra-alpha cluster in the broad-dsde-alpha project.
//
// Note that this does NOT generate a context for releases that live outside the environent's default cluster.
// (eg. "datarepo"). To generate a context for those releases as well, call addAllReleases() instead.
func (c *kubeconfig) addEnvironmentDefault(env terra.Environment) (Kubectx, error) {
_kubectx := kubectxForEnvironment(env)
err := c.writeContextIfNeeded(_kubectx)
if err != nil {
return kubectx{}, err
}
return _kubectx, nil
}
// addRelease adds a context for this release to the kubecfg file.
//
// If this is an application release deployed to an environment's default cluster, this will add the environment
// default context (see addEnvironmentDefault).
//
// Else, this will add context for this release that is keyed by the release's Argo application name and points
// at the release's target cluster and namespace.
func (c *kubeconfig) addRelease(release terra.Release) (Kubectx, error) {
_kubectx := kubectxForRelease(release)
err := c.writeContextIfNeeded(_kubectx)
if err != nil {
return kubectx{}, err
}
return _kubectx, nil
}
// addCluster updates the kubecfg to include a context for given cluster, pointing at the cluster's default
// namespace.
//
// For example, if called for Terra's alpha cluster, this will add a context to
// the kubeconfig that is called "cluster_terra-alpha", uses the default namespace, and is (of course) pointing
// at the terra-alpha cluster in the broad-dsde-alpha project.
func (c *kubeconfig) addCluster(cluster terra.Cluster) (Kubectx, error) {
_kubectx := kubectxForCluster(cluster)
err := c.writeContextIfNeeded(_kubectx)
if err != nil {
return kubectx{}, err
}
return _kubectx, nil
}
// writeContextIfNeeded writes a context with the given name, namespace, and target cluster to the kubeconfig file,
// unless the same context has already been written at least once by this kubecfg instance.
// (saves Thelma from making a Google Cloud api call for every `kubectl` command it runs)
func (c *kubeconfig) writeContextIfNeeded(ctx kubectx) error {
c.mutex.Lock()
_, exists := c.writtenCtxs[ctx.contextName]
c.mutex.Unlock()
if exists {
// this context has already been written once by this kubecfg, no need to write again.
return nil
}
err := c.writeContext(ctx)
if err != nil {
return err
}
c.mutex.Lock()
c.writtenCtxs[ctx.contextName] = struct{}{}
c.mutex.Unlock()
return nil
}
// writeContext writes a context with the given name, namespace and target cluster to the kubeconfig file
// a file lock is used to prevent concurrent updates from stomping on one another
func (c *kubeconfig) writeContext(ctx kubectx) error {
log.Debug().
Str("context", ctx.contextName).
Str("namespace", ctx.namespace).
Str("cluster", ctx.cluster.Name()).
Msgf("Generating %s entry for %s", c.file, ctx.contextName)
err := c.locker.WithLock(func() error {
return c.writeContextUnsafe(ctx)
})
if err != nil {
return errors.Errorf("error generating kubectx for cluster %s: %v", ctx.cluster.Name(), err)
}
return nil
}
// writeContextUnsafe writes a context with the given name, namespace and target cluster to the kubeconfig file
// (it does not synchronize write access, hence "unsafe")
func (c *kubeconfig) writeContextUnsafe(ctx kubectx) error {
cfg, err := c.readKubecfg()
if err != nil {
return err
}
if cfg == nil {
cfg = &clientcmdapi.Config{}
}
cluster := ctx.cluster
clusterData, err := c.gkeClient.GetCluster(context.Background(), &containerpb.GetClusterRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/clusters/%s",
cluster.Project(),
cluster.Location(),
cluster.Name()),
})
if err != nil {
return err
}
// Add cluster CA certificate
caCert, err := base64.StdEncoding.DecodeString(clusterData.MasterAuth.ClusterCaCertificate)
if err != nil {
return err
}
// Add cluster definition to kubeconfig
if len(cfg.Clusters) == 0 {
cfg.Clusters = make(map[string]*clientcmdapi.Cluster)
}
cfg.Clusters[cluster.Name()] = &clientcmdapi.Cluster{
Server: cluster.Address(),
CertificateAuthorityData: caCert,
}
if len(cfg.Contexts) == 0 {
cfg.Contexts = make(map[string]*clientcmdapi.Context)
}
cfg.Contexts[ctx.contextName] = &clientcmdapi.Context{
Cluster: cluster.Name(),
Namespace: ctx.namespace,
AuthInfo: defaultAuthInfo,
}
if len(cfg.AuthInfos) == 0 {
cfg.AuthInfos = make(map[string]*clientcmdapi.AuthInfo)
}
token, err := c.tokenSource.Token()
if err != nil {
return err
}
// https://kubernetes.io/docs/reference/access-authn-authz/authentication/#configuration
cfg.AuthInfos[defaultAuthInfo] = &clientcmdapi.AuthInfo{
// TODO we can use an exec command here to run thelma and print out the auth token.
// This would make kubecfg entries stable/re-usable, so we only need to generate them once.
// Eg. Exec: &clientcmdapi.ExecConfig{
// Command: "thelma",
// Args: []string{"auth", "gcp", "--access-token", "--cluster", cluster.Name(), "--echo"},
// },
Token: token.AccessToken,
}
return clientcmd.WriteToFile(*cfg, c.file)
}
// read kube config file, returning nil if it does not exist
func (c *kubeconfig) readKubecfg() (*clientcmdapi.Config, error) {
exists, err := utils.FileExists(c.file)
if err != nil {
return nil, err
}
if !exists {
return nil, nil
}
return clientcmd.LoadFromFile(c.file)
}
func kubectxForEnvironment(env terra.Environment) kubectx {
return kubectx{
contextName: env.Name(),
cluster: env.DefaultCluster(),
namespace: env.Namespace(),
}
}
func kubectxForRelease(release terra.Release) kubectx {
return kubectx{
contextName: contextNameForRelease(release),
cluster: release.Cluster(),
namespace: release.Namespace(),
}
}
func kubectxForCluster(cluster terra.Cluster) kubectx {
return kubectx{
contextName: contextNameForCluster(cluster),
cluster: cluster,
}
}
func contextNameForCluster(cluster terra.Cluster) string {
// prefix with cluster to avoid collisions with environment context names
return "cluster" + ctxDelimiter + cluster.Name()
}
// contextNameForRelease computes name of the kubecfg context for this release.
// If the release is deployed to an environment's default cluster, then use the
// environment name. ("alpha", "staging", "fiab-choover-funky-squirrel")
// If the release is a cluster release, or if the release is deployed to a different cluster than the environment's
// default, use the ArgoCD application name, which is globally unique (eg. "datarepo_staging")
func contextNameForRelease(release terra.Release) string {
if release.IsClusterRelease() {
return release.Cluster().Name() + ctxDelimiter + release.Name()
}
appRelease, ok := release.(terra.AppRelease)
if !ok {
panic(errors.Errorf("failed to cast to AppRelease: %v", appRelease))
}
if appRelease.Cluster().Name() != appRelease.Environment().DefaultCluster().Name() {
return appRelease.Environment().Name() + ctxDelimiter + release.Name()
}
return appRelease.Environment().Name()
}