-
Notifications
You must be signed in to change notification settings - Fork 23
/
provide.go
209 lines (184 loc) · 6.84 KB
/
provide.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package k8s
import (
"context"
"fmt"
"net/http"
"strings"
"github.com/go-logr/zerologr"
"go.uber.org/fx"
autoscalingv1 "k8s.io/api/autoscaling/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/scale"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
"github.com/fluxninja/aperture/v2/pkg/log"
commonhttp "github.com/fluxninja/aperture/v2/pkg/net/http"
"github.com/fluxninja/aperture/v2/pkg/utils"
)
var (
// swagger:operation POST /kubernetes_client common-configuration KubernetesClient
// ---
// x-fn-config-env: true
// parameters:
// - name: http_client
// in: body
// schema:
// "$ref": "#/definitions/HTTPClientConfig"
// KubernetesClientConfigKey is the key used to store the KubernetesClientConfig in the config.
kubernetesClientConfigKey = "kubernetes_client"
// HttpConfigKey is the key used to store the HTTPClientConfig in the config.
httpConfigKey = strings.Join([]string{kubernetesClientConfigKey, "http_client"}, ".")
)
// K8sClientConstructorIn holds parameter for Providek8sClient and Providek8sDynamicClient.
type K8sClientConstructorIn struct {
fx.In
HTTPClient *http.Client `name:"k8s-http-client"`
Shutdowner fx.Shutdowner
}
// Module provides a K8sClient.
func Module() fx.Option {
return fx.Options(
commonhttp.ClientConstructor{Name: "k8s-http-client", ConfigKey: httpConfigKey}.Annotate(),
fx.Provide(Providek8sClient),
)
}
// K8sClient provides an interface for kubernetes client.
type K8sClient interface {
GetClientSet() *kubernetes.Clientset
GetScaleClient() scale.ScalesGetter
GetDynamicClient() dynamic.Interface
GetRESTMapper() apimeta.RESTMapper
ScaleForGroupKind(context.Context, string, string, schema.GroupKind) (*autoscalingv1.Scale, schema.GroupResource, error)
}
// RealK8sClient provides access to Kubernetes Clients.
type RealK8sClient struct {
clientSet *kubernetes.Clientset
scaleClient scale.ScalesGetter
dynamicClient dynamic.Interface
mapper apimeta.RESTMapper
}
// RealK8sClient implements K8sClient.
var _ K8sClient = &RealK8sClient{}
// NewK8sClient returns a new kubernetes client, or nil if outside a Kubernetes cluster.
func NewK8sClient(httpClient *http.Client, shutdowner fx.Shutdowner) (*RealK8sClient, error) {
clientSet, config, err := newK8sClientSet(httpClient, shutdowner)
if err == rest.ErrNotInCluster {
log.Info().Msg("Not in Kubernetes Cluster, creating nil client")
return nil, nil
}
if err != nil {
return nil, err
}
zerolog := log.WithComponent("k8s-client").GetZerolog()
klog.SetLogger(zerologr.New(zerolog))
discoveryClient := clientSet.DiscoveryClient
cachedDiscoveryClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoveryClient)
scaleKindResolver := scale.NewDiscoveryScaleKindResolver(discoveryClient)
scaleClient, err := scale.NewForConfig(config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
if err != nil {
log.Fatal().Err(err).Msg("Unexpected error, unable to create Kubernetes Scale Client")
utils.Shutdown(shutdowner)
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Fatal().Err(err).Msg("Unexpected error, unable to create Kubernetes Dynamic Client")
utils.Shutdown(shutdowner)
return nil, err
}
return &RealK8sClient{
clientSet: clientSet,
scaleClient: scaleClient,
dynamicClient: dynamicClient,
mapper: mapper,
}, nil
}
// GetClientSet returns the kubernetes client set.
func (r *RealK8sClient) GetClientSet() *kubernetes.Clientset {
return r.clientSet
}
// GetScaleClient returns the kubernetes scale client.
func (r *RealK8sClient) GetScaleClient() scale.ScalesGetter {
return r.scaleClient
}
// GetDynamicClient returns the kubernetes dynamic client.
func (r *RealK8sClient) GetDynamicClient() dynamic.Interface {
return r.dynamicClient
}
// GetRESTMapper returns the rest mapper.
func (r *RealK8sClient) GetRESTMapper() apimeta.RESTMapper {
return r.mapper
}
// ScaleForGroupKind attempts to fetch the scale for the given Group and Kind.
// The possible Resources for the group and kind are retrieved. Scale is fetched
// for each Resource in the RESTMapping with the given name and namespace, until
// a working one is found. If none work, the first error is returned. It returns
// both the scale, as well as the group-resource from the working mapping.
func (r *RealK8sClient) ScaleForGroupKind(ctx context.Context, namespace, name string, groupKind schema.GroupKind) (*autoscalingv1.Scale, schema.GroupResource, error) {
mappings, err := r.GetRESTMapper().RESTMappings(groupKind)
if err != nil {
return nil, schema.GroupResource{}, err
}
var firstErr error
for i, mapping := range mappings {
targetGR := mapping.Resource.GroupResource()
scale, err := r.GetScaleClient().Scales(namespace).Get(ctx, targetGR, name, metav1.GetOptions{})
if err == nil {
return scale, targetGR, nil
}
// if this is the first error, remember it,
// then go on and try other mappings until we find a good one
if i == 0 {
firstErr = err
}
}
// make sure we handle an empty set of mappings
if firstErr == nil {
firstErr = fmt.Errorf("unrecognized resource")
}
return nil, schema.GroupResource{}, firstErr
}
// Providek8sClient provides a new kubernetes client and sets logger.
func Providek8sClient(in K8sClientConstructorIn) (K8sClient, error) {
return NewK8sClient(in.HTTPClient, in.Shutdowner)
}
func newK8sClientSet(client *http.Client, shutdowner fx.Shutdowner) (*kubernetes.Clientset, *rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
log.Info().Msg("Not in Kubernetes cluster, could not create Kubernetes client")
return nil, nil, err
}
transportConfig, err := config.TransportConfig()
if err != nil {
// Call shutdowner to catch this issue early since Kubernetes Client is marked as optional downstream.
log.Fatal().Err(err).Msg("Unexpected error creating Kubernetes Client's transport config")
utils.Shutdown(shutdowner)
return nil, nil, err
}
tlsConfig, err := transport.TLSConfigFor(transportConfig)
if err != nil {
log.Fatal().Err(err).Msg("Unexpected error creating Kubernetes Client's TLS Config")
utils.Shutdown(shutdowner)
return nil, nil, err
}
transport := client.Transport.(*http.Transport)
transport.TLSClientConfig = tlsConfig
config.TLSClientConfig = rest.TLSClientConfig{}
config.Transport = transport
config.Timeout = client.Timeout
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatal().Err(err).Msg("Unexpected error creating Kubernetes Client")
utils.Shutdown(shutdowner)
return nil, nil, err
}
return clientSet, config, nil
}