-
Notifications
You must be signed in to change notification settings - Fork 1k
/
resourcestore.go
229 lines (207 loc) · 7.37 KB
/
resourcestore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package resourcestore
import (
"context"
"fmt"
"sync"
"time"
"github.com/cri-o/cri-o/internal/log"
"github.com/sirupsen/logrus"
)
const (
sleepTimeBeforeCleanup = 1 * time.Minute
StageUnknown = "unknown"
)
// ResourceStore is a structure that saves information about a recently created resource.
// Resources can be added and retrieved from the store. A retrieval (Get) also removes the Resource from the store.
// The ResourceStore comes with a cleanup routine that loops through the resources and marks them as stale, or removes
// them if they're already stale, then sleeps for `timeout`.
// Thus, it takes between `timeout` and `2*timeout` for unrequested resources to be cleaned up.
// Another routine can request a watcher for a resource by calling WatcherForResource.
// All watchers will be notified when the resource has successfully been created.
type ResourceStore struct {
resources map[string]*Resource
timeout time.Duration
closeChan chan struct{}
closed bool
mutex sync.Mutex
}
// Resource contains the actual resource itself (which must implement the IdentifiableCreatable interface),
// as well as stores function pointers that pertain to how that resource should be cleaned up,
// and keeps track of other requests that are watching for the successful creation of this resource.
type Resource struct {
resource IdentifiableCreatable
cleaner *ResourceCleaner
watchers []chan struct{}
stale bool
name string
stage string
}
// wasPut checks that a resource has been fully defined yet.
// This is defined as a resource that only has watchers, but no associated resource.
func (r *Resource) wasPut() bool {
return r != nil && r.resource != nil
}
// IdentifiableCreatable are the qualities needed by the caller of the resource.
// Once a resource is retrieved, SetCreated() will be called, indicating to the server
// that resource is ready to be listed and operated upon, and ID() will be used to identify the
// newly created resource to the server.
type IdentifiableCreatable interface {
ID() string
SetCreated()
}
// New creates a new ResourceStore, with a default timeout, and starts the cleanup function
func New() *ResourceStore {
return NewWithTimeout(sleepTimeBeforeCleanup)
}
// NewWithTimeout is used for testing purposes. It allows the caller to set the timeout, allowing for faster tests.
// Most callers should use New instead.
func NewWithTimeout(timeout time.Duration) *ResourceStore {
rc := &ResourceStore{
resources: make(map[string]*Resource),
closeChan: make(chan struct{}, 1),
timeout: timeout,
}
go rc.cleanupStaleResources()
return rc
}
func (rc *ResourceStore) Close() {
rc.mutex.Lock()
defer rc.mutex.Unlock()
if rc.closed {
return
}
close(rc.closeChan)
rc.closed = true
}
// cleanupStaleResources is responsible for cleaning up resources that haven't been gotten
// from the store.
// It runs on a loop, sleeping `sleepTimeBeforeCleanup` between each loop.
// A resource will first be marked as stale before being cleaned up.
// This means a resource will stay in the store between `sleepTimeBeforeCleanup` and `2*sleepTimeBeforeCleanup`.
// When a resource is cleaned up, it's removed from the store and the cleanup funcs in its cleaner are called.
func (rc *ResourceStore) cleanupStaleResources() {
for {
select {
case <-rc.closeChan:
return
case <-time.After(rc.timeout):
}
resourcesToReap := []*Resource{}
rc.mutex.Lock()
for name, r := range rc.resources {
// this resource shouldn't be marked as stale if it
// hasn't yet been added to the store.
// This can happen if a creation is in progress, and a watcher is added
// before the creation completes.
// If this resource isn't skipped from being marked as stale,
// we risk segfaulting in the Cleanup() step.
if !r.wasPut() {
continue
}
if r.stale {
resourcesToReap = append(resourcesToReap, r)
delete(rc.resources, name)
}
r.stale = true
}
// no need to hold the lock when running the cleanup functions
rc.mutex.Unlock()
for _, r := range resourcesToReap {
logrus.Infof("Cleaning up stale resource %s", r.name)
if err := r.cleaner.Cleanup(); err != nil {
logrus.Errorf("Unable to cleanup: %v", err)
}
}
}
}
// Get attempts to look up a resource by its name.
// If it's found, it's removed from the store, and it is set as created.
// Get returns an empty ID if the resource is not found,
// and returns the value of the Resource's ID() method if it is.
func (rc *ResourceStore) Get(name string) string {
rc.mutex.Lock()
defer rc.mutex.Unlock()
r, ok := rc.resources[name]
if !ok {
return ""
}
// It is possible there are existing watchers,
// but no resource created yet
if !r.wasPut() {
return ""
}
delete(rc.resources, name)
r.resource.SetCreated()
return r.resource.ID()
}
// Put takes a unique resource name (retrieved from the client request, not generated by the server),
// a newly created resource, and functions to clean up that newly created resource.
// It adds the Resource to the ResourceStore. It expects name to be unique, and
// returns an error if a duplicate name is detected.
func (rc *ResourceStore) Put(name string, resource IdentifiableCreatable, cleaner *ResourceCleaner) error {
rc.mutex.Lock()
defer rc.mutex.Unlock()
r, ok := rc.resources[name]
// if we don't already have a resource, create it
if !ok {
r = &Resource{}
rc.resources[name] = r
}
// make sure the resource hasn't already been added to the store
if ok && r.wasPut() {
return fmt.Errorf("failed to add entry %s to ResourceStore; entry already exists", name)
}
r.resource = resource
r.cleaner = cleaner
r.name = name
// now the resource is created, notify the watchers
for _, w := range r.watchers {
w <- struct{}{}
}
return nil
}
// Delete deletes the specified resource from the store.
// Any resource that has a stage set, but was never Put should have Delete called, or else it will leak.
func (rc *ResourceStore) Delete(name string) {
rc.mutex.Lock()
defer rc.mutex.Unlock()
delete(rc.resources, name)
}
// WatcherForResource looks up a Resource by name, and gives it a watcher.
// If no entry exists for that resource, a placeholder is created and a watcher is given to that
// placeholder resource.
// A watcher can be used for concurrent processes to wait for the resource to be created.
// This is useful for situations where clients retry requests quickly after they "fail" because
// they've taken too long. Adding a watcher allows the server to slow down the client, but still
// return the resource in a timely manner once it's actually created.
func (rc *ResourceStore) WatcherForResource(name string) (watcher chan struct{}, stage string) {
rc.mutex.Lock()
defer rc.mutex.Unlock()
watcher = make(chan struct{}, 1)
r, ok := rc.resources[name]
if !ok {
rc.resources[name] = &Resource{
watchers: []chan struct{}{watcher},
name: name,
}
return watcher, StageUnknown
}
r.watchers = append(r.watchers, watcher)
return watcher, r.stage
}
func (rc *ResourceStore) SetStageForResource(ctx context.Context, name, stage string) {
rc.mutex.Lock()
defer rc.mutex.Unlock()
r, ok := rc.resources[name]
if !ok {
log.Debugf(ctx, "Initializing stage for resource %s to %s", name, stage)
rc.resources[name] = &Resource{
watchers: []chan struct{}{},
name: name,
stage: stage,
}
return
}
log.Debugf(ctx, "Setting stage for resource %s from %s to %s", name, r.stage, stage)
r.stage = stage
}