This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
client.go
144 lines (117 loc) · 4.06 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Simple implementation of a KubeClient that caches reads and falls back
// to make direct API calls on failure. Write calls are not cached.
package k8s
import (
"context"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
type kubeClient struct {
client client.Client
cache cache.Cache
}
func (k *kubeClient) GetClient() client.Client {
return k.client
}
func (k *kubeClient) GetCache() cache.Cache {
return k.cache
}
func newKubeClient(c client.Client, cache cache.Cache) core.KubeClient {
return &kubeClient{client: c, cache: cache}
}
type fallbackClientReader struct {
orderedClients []client.Reader
}
func (c fallbackClientReader) Get(ctx context.Context, key client.ObjectKey, out client.Object) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.Get(ctx, key, out); err == nil {
return nil
}
}
return
}
func (c fallbackClientReader) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) (err error) {
for _, k8sClient := range c.orderedClients {
if err = k8sClient.List(ctx, list, opts...); err == nil {
return nil
}
}
return
}
// ClientBuilder builder is the interface for the client builder.
type ClientBuilder interface {
// WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache
// for this client. This function can be called multiple times, it should append to an internal slice.
WithUncached(objs ...client.Object) ClientBuilder
// Build returns a new client.
Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error)
}
type fallbackClientBuilder struct {
uncached []client.Object
}
func (f *fallbackClientBuilder) WithUncached(objs ...client.Object) ClientBuilder {
f.uncached = append(f.uncached, objs...)
return f
}
func (f fallbackClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return client.NewDelegatingClient(client.NewDelegatingClientInput{
Client: c,
CacheReader: fallbackClientReader{
orderedClients: []client.Reader{cache, c},
},
UncachedObjects: f.uncached,
// TODO figure out if this should be true?
// CacheUnstructured: true,
})
}
// Creates a new k8s client that uses the cached client for reads and falls back to making API
// calls if it failed. Write calls will always go to raw client directly.
func NewFallbackClientBuilder() ClientBuilder {
return &fallbackClientBuilder{}
}
type Options struct {
MapperProvider func(*rest.Config) (meta.RESTMapper, error)
CacheOptions *cache.Options
ClientOptions *client.Options
}
// NewKubeClient creates a new KubeClient that caches reads and falls back to
// make API calls on failure. Write calls are not cached.
func NewKubeClient(config *rest.Config, options Options) (core.KubeClient, error) {
if options.MapperProvider == nil {
options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) {
return apiutil.NewDynamicRESTMapper(config)
}
}
mapper, err := options.MapperProvider(config)
if err != nil {
return nil, err
}
if options.CacheOptions == nil {
options.CacheOptions = &cache.Options{Mapper: mapper}
}
cache, err := cache.New(config, *options.CacheOptions)
if err != nil {
return nil, err
}
if options.ClientOptions == nil {
options.ClientOptions = &client.Options{Mapper: mapper}
}
fallbackClient, err := NewFallbackClientBuilder().Build(cache, config, *options.ClientOptions)
if err != nil {
return nil, err
}
return newKubeClient(fallbackClient, cache), nil
}
// NewDefaultKubeClient creates a new KubeClient with default options set.
// This client caches reads and falls back to make API calls on failure. Write calls are not cached.
func NewDefaultKubeClient(config *rest.Config) (core.KubeClient, error) {
return NewKubeClient(config, Options{})
}