forked from go-kit/kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
endpoint_cache.go
143 lines (123 loc) · 3.82 KB
/
endpoint_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package sd
import (
"io"
"sort"
"sync"
"time"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
)
// endpointCache collects the most recent set of instances from a service discovery
// system, creates endpoints for them using a factory function, and makes
// them available to consumers.
type endpointCache struct {
options endpointerOptions
mtx sync.RWMutex
factory Factory
cache map[string]endpointCloser
err error
endpoints []endpoint.Endpoint
logger log.Logger
invalidateDeadline time.Time
timeNow func() time.Time
}
type endpointCloser struct {
endpoint.Endpoint
io.Closer
}
// newEndpointCache returns a new, empty endpointCache.
func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
return &endpointCache{
options: options,
factory: factory,
cache: map[string]endpointCloser{},
logger: logger,
timeNow: time.Now,
}
}
// Update should be invoked by clients with a complete set of current instance
// strings whenever that set changes. The cache manufactures new endpoints via
// the factory, closes old endpoints when they disappear, and persists existing
// endpoints if they survive through an update.
func (c *endpointCache) Update(event Event) {
c.mtx.Lock()
defer c.mtx.Unlock()
// Happy path.
if event.Err == nil {
c.updateCache(event.Instances)
c.err = nil
return
}
// Sad path. Something's gone wrong in sd.
c.logger.Log("err", event.Err)
if !c.options.invalidateOnError {
return // keep returning the last known endpoints on error
}
if c.err != nil {
return // already in the error state, do nothing & keep original error
}
c.err = event.Err
// set new deadline to invalidate Endpoints unless non-error Event is received
c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout)
return
}
func (c *endpointCache) updateCache(instances []string) {
// Deterministic order (for later).
sort.Strings(instances)
// Produce the current set of services.
cache := make(map[string]endpointCloser, len(instances))
for _, instance := range instances {
// If it already exists, just copy it over.
if sc, ok := c.cache[instance]; ok {
cache[instance] = sc
delete(c.cache, instance)
continue
}
// If it doesn't exist, create it.
service, closer, err := c.factory(instance)
if err != nil {
c.logger.Log("instance", instance, "err", err)
continue
}
cache[instance] = endpointCloser{service, closer}
}
// Close any leftover endpoints.
for _, sc := range c.cache {
if sc.Closer != nil {
sc.Closer.Close()
}
}
// Populate the slice of endpoints.
endpoints := make([]endpoint.Endpoint, 0, len(cache))
for _, instance := range instances {
// A bad factory may mean an instance is not present.
if _, ok := cache[instance]; !ok {
continue
}
endpoints = append(endpoints, cache[instance].Endpoint)
}
// Swap and trigger GC for old copies.
c.endpoints = endpoints
c.cache = cache
}
// Endpoints yields the current set of (presumably identical) endpoints, ordered
// lexicographically by the corresponding instance string.
func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
// in the steady state we're going to have many goroutines calling Endpoints()
// concurrently, so to minimize contention we use a shared R-lock.
c.mtx.RLock()
if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
defer c.mtx.RUnlock()
return c.endpoints, nil
}
c.mtx.RUnlock()
// in case of an error, switch to an exclusive lock.
c.mtx.Lock()
defer c.mtx.Unlock()
// re-check condition due to a race between RUnlock() and Lock().
if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
return c.endpoints, nil
}
c.updateCache(nil) // close any remaining active endpoints
return nil, c.err
}