forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
environmentresolvercache.go
149 lines (135 loc) · 3.36 KB
/
environmentresolvercache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package service
import (
"fmt"
"os"
"strconv"
"strings"
"sync"
"k8s.io/kubernetes/pkg/api"
)
// ServiceRetriever is an interface for retrieving services
type ServiceRetriever interface {
Get(name string) (*api.Service, error)
}
type serviceEntry struct {
host string
port string
}
// ResolverCacheFunc is used for resolving names to services
type ResolverCacheFunc func(name string) (*api.Service, error)
// ServiceResolverCache is a cache used for resolving names to services
type ServiceResolverCache struct {
fill ResolverCacheFunc
cache map[string]serviceEntry
lock sync.RWMutex
}
// NewServiceResolverCache returns a new ServiceResolverCache
func NewServiceResolverCache(fill ResolverCacheFunc) *ServiceResolverCache {
return &ServiceResolverCache{
cache: make(map[string]serviceEntry),
fill: fill,
}
}
func (c *ServiceResolverCache) get(name string) (host, port string, ok bool) {
// check
c.lock.RLock()
entry, found := c.cache[name]
c.lock.RUnlock()
if found {
return entry.host, entry.port, true
}
// fill the cache
c.lock.Lock()
defer c.lock.Unlock()
if entry, found := c.cache[name]; found {
return entry.host, entry.port, true
}
service, err := c.fill(name)
if err != nil {
return
}
if len(service.Spec.Ports) == 0 {
return
}
host, port, ok = service.Spec.ClusterIP, strconv.Itoa(service.Spec.Ports[0].Port), true
c.cache[name] = serviceEntry{
host: host,
port: port,
}
return
}
func toServiceName(envName string) string {
return strings.TrimSpace(strings.ToLower(strings.Replace(envName, "_", "-", -1)))
}
func recognizeVariable(name string) (service string, host bool, ok bool) {
switch {
case strings.HasSuffix(name, "_SERVICE_HOST"):
service = toServiceName(strings.TrimSuffix(name, "_SERVICE_HOST"))
host = true
case strings.HasSuffix(name, "_SERVICE_PORT"):
service = toServiceName(strings.TrimSuffix(name, "_SERVICE_PORT"))
default:
return "", false, false
}
if len(service) == 0 {
return "", false, false
}
ok = true
return
}
func (c *ServiceResolverCache) resolve(name string) (string, bool) {
service, isHost, ok := recognizeVariable(name)
if !ok {
return "", false
}
host, port, ok := c.get(service)
if !ok {
return "", false
}
if isHost {
return host, true
}
return port, true
}
// Defer takes a string (with optional variables) and an expansion function and returns
// a function that can be called to get the value. This method will optimize the
// expansion away in the event that no expansion is necessary.
func (c *ServiceResolverCache) Defer(env string) (func() (string, bool), error) {
hasExpansion := false
invalid := []string{}
os.Expand(env, func(name string) string {
hasExpansion = true
if _, _, ok := recognizeVariable(name); !ok {
invalid = append(invalid, name)
}
return ""
})
if len(invalid) != 0 {
return nil, fmt.Errorf("invalid variable name(s): %s", strings.Join(invalid, ", "))
}
if !hasExpansion {
return func() (string, bool) { return env, true }, nil
}
// only load the value once
lock := sync.Mutex{}
loaded := false
return func() (string, bool) {
lock.Lock()
defer lock.Unlock()
if loaded {
return env, true
}
resolved := true
expand := os.Expand(env, func(s string) string {
s, ok := c.resolve(s)
resolved = resolved && ok
return s
})
if !resolved {
return "", false
}
loaded = true
env = expand
return env, true
}, nil
}