forked from kiali/kiali
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_factory.go
137 lines (113 loc) · 3.39 KB
/
client_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package kubernetes
import (
"sync"
"time"
"k8s.io/client-go/rest"
"github.com/kiali/kiali/log"
"github.com/kiali/kiali/prometheus/internalmetrics"
)
var factory *clientFactory
// Mutex for when modifying the stored clients
var mutex = &sync.RWMutex{}
const expirationTime = time.Minute * 15
// ClientFactory interface for the clientFactory object
type ClientFactory interface {
GetClient(token string) (IstioClientInterface, error)
}
// clientFactory used to generate per users clients
type clientFactory struct {
ClientFactory
baseIstioConfig *rest.Config
clientEntries map[string]*clientEntry
}
// clientEntry stored the client and its created timestamp
type clientEntry struct {
client IstioClientInterface
created time.Time
}
// GetClientFactory returns the client factory. Creates a new one if necessary
func GetClientFactory() (ClientFactory, error) {
if factory == nil {
// Get the normal configuration
config, err := ConfigClient()
if err != nil {
return nil, err
}
// Create a new config based on what was gathered above but don't specify the bearer token to use
istioConfig := rest.Config{
Host: config.Host,
TLSClientConfig: config.TLSClientConfig,
QPS: config.QPS,
Burst: config.Burst,
}
return getClientFactory(&istioConfig, expirationTime)
}
return factory, nil
}
// newClientFactory allows for specifying the config and expiry duration
// Mock friendly for testing purposes
func getClientFactory(istioConfig *rest.Config, expiry time.Duration) (*clientFactory, error) {
mutex.Lock()
if factory == nil {
clientEntriesMap := make(map[string]*clientEntry)
factory = &clientFactory{
baseIstioConfig: istioConfig,
clientEntries: clientEntriesMap,
}
go watchClients(clientEntriesMap, expiry)
}
mutex.Unlock()
return factory, nil
}
// NewClient creates a new IstioClientInterface based on a users k8s token
func (cf *clientFactory) newClient(token string) (IstioClientInterface, error) {
config := cf.baseIstioConfig
config.BearerToken = token
return NewClientFromConfig(config)
}
// GetClient returns a client for the specified token. Creating one if necessary.
func (cf *clientFactory) GetClient(token string) (IstioClientInterface, error) {
clientEntry, err := cf.getClientEntry(token)
if err != nil {
return nil, err
}
return clientEntry.client, nil
}
// getClientEntry returns a clientEntry for the specified token. Creating one if necessary.
func (cf *clientFactory) getClientEntry(token string) (*clientEntry, error) {
mutex.RLock()
cEntry, ok := cf.clientEntries[token]
mutex.RUnlock()
if ok {
return cEntry, nil
} else {
client, err := cf.newClient(token)
if err != nil {
log.Errorf("Error fetching the Kubernetes client: %v", err)
return nil, err
}
cEntry := clientEntry{
client: client,
created: time.Now(),
}
mutex.Lock()
cf.clientEntries[token] = &cEntry
mutex.Unlock()
internalmetrics.SetKubernetesClients(len(cf.clientEntries))
return &cEntry, nil
}
}
// watchClients loops over clients and removes ones which are too old
func watchClients(clientEntries map[string]*clientEntry, expiry time.Duration) {
for {
time.Sleep(expiry)
mutex.Lock()
for token, clientEntry := range clientEntries {
if time.Since(clientEntry.created) > expiry {
delete(clientEntries, token)
}
}
internalmetrics.SetKubernetesClients(len(clientEntries))
mutex.Unlock()
}
}