-
Notifications
You must be signed in to change notification settings - Fork 134
/
kubernetes_client.go
92 lines (79 loc) · 2.58 KB
/
kubernetes_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package cluster
import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
"github.com/armadaproject/armada/internal/common/armadaerrors"
)
type KubernetesClientProvider interface {
ClientForUser(user string, groups []string) (kubernetes.Interface, error)
Client() kubernetes.Interface
ClientConfig() *rest.Config
}
type ConfigKubernetesClientProvider struct {
restConfig *rest.Config
impersonateUsers bool
client kubernetes.Interface
}
func NewKubernetesClientProvider(impersonateUsers bool, qps float32, burst int) (*ConfigKubernetesClientProvider, error) {
if qps == 0 {
return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "qps",
Value: qps,
Message: "qps must be positive",
})
}
if burst == 0 {
return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "burst",
Value: burst,
Message: "burst must be positive",
})
}
restConfig, err := loadConfig()
if err != nil {
return nil, err
}
// Use a shared rate limiter for all clients created by this provider.
// This limits the total number of concurrent calls across all clients to burst
// and the total number of calls per second to qps.
restConfig.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
return &ConfigKubernetesClientProvider{
restConfig: restConfig,
impersonateUsers: impersonateUsers,
client: client,
},
nil
}
func (c *ConfigKubernetesClientProvider) Client() kubernetes.Interface {
return c.client
}
func (c *ConfigKubernetesClientProvider) ClientConfig() *rest.Config {
return c.restConfig
}
func (c *ConfigKubernetesClientProvider) ClientForUser(user string, groups []string) (kubernetes.Interface, error) {
if !c.impersonateUsers {
return c.client, nil
}
config := *c.restConfig // shallow copy of the config
config.Impersonate = rest.ImpersonationConfig{UserName: user, Groups: groups}
return kubernetes.NewForConfig(&config)
}
func loadConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err == rest.ErrNotInCluster {
log.Info("Running with default client configuration")
rules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
}
log.Info("Running with in cluster client configuration")
return config, err
}