forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
factory.go
146 lines (130 loc) · 4.67 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package controller
import (
"time"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/controller"
"github.com/openshift/origin/pkg/image/api"
)
// ImportControllerFactory can create an ImportController.
type ImportControllerFactory struct {
Client client.Interface
ResyncInterval time.Duration
MinimumCheckInterval time.Duration
ImportRateLimiter flowcontrol.RateLimiter
ScheduleEnabled bool
}
// Create creates an ImportController.
func (f *ImportControllerFactory) Create() (controller.RunnableController, controller.StoppableController) {
lw := &cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return f.Client.ImageStreams(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return f.Client.ImageStreams(kapi.NamespaceAll).Watch(options)
},
}
q := cache.NewResyncableFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(lw, &api.ImageStream{}, q, f.ResyncInterval).Run()
// instantiate a scheduled importer using a number of buckets
buckets := 4
switch {
case f.MinimumCheckInterval > time.Hour:
buckets = 8
case f.MinimumCheckInterval < 10*time.Minute:
buckets = 2
}
seconds := f.MinimumCheckInterval / time.Second
bucketQPS := 1.0 / float32(seconds) * float32(buckets)
limiter := flowcontrol.NewTokenBucketRateLimiter(bucketQPS, 1)
b := newScheduled(f.ScheduleEnabled, f.Client, buckets, limiter, f.ImportRateLimiter)
// instantiate an importer for changes that happen to the image stream
changed := &controller.RetryController{
Queue: q,
RetryManager: controller.NewQueueRetryManager(
q,
cache.MetaNamespaceKeyFunc,
func(obj interface{}, err error, retries controller.Retry) bool {
utilruntime.HandleError(err)
return retries.Count < 5
},
flowcontrol.NewTokenBucketRateLimiter(1, 10),
),
Handle: b.Handle,
}
return changed, b.scheduler
}
type uniqueItem struct {
uid string
resourceVersion string
}
// scheduled watches for changes to image streams and adds them to the list of streams to be
// periodically imported (later) or directly imported (now).
type scheduled struct {
enabled bool
scheduler *controller.Scheduler
rateLimiter flowcontrol.RateLimiter
controller *ImportController
}
// newScheduled initializes a scheduled import object and sets its scheduler. Limiter is optional.
func newScheduled(enabled bool, client client.ImageStreamsNamespacer, buckets int, bucketLimiter, importLimiter flowcontrol.RateLimiter) *scheduled {
b := &scheduled{
enabled: enabled,
rateLimiter: importLimiter,
controller: &ImportController{
streams: client,
},
}
b.scheduler = controller.NewScheduler(buckets, bucketLimiter, b.HandleTimed)
return b
}
// Handle ensures an image stream is checked for scheduling and then runs a direct import
func (b *scheduled) Handle(obj interface{}) error {
stream := obj.(*api.ImageStream)
if b.enabled && needsScheduling(stream) {
key, _ := cache.MetaNamespaceKeyFunc(stream)
b.scheduler.Add(key, uniqueItem{uid: string(stream.UID), resourceVersion: stream.ResourceVersion})
}
return b.controller.Next(stream, b)
}
// HandleTimed is invoked when a key is ready to be processed.
func (b *scheduled) HandleTimed(key, value interface{}) {
if !b.enabled {
b.scheduler.Remove(key, value)
return
}
glog.V(5).Infof("DEBUG: checking %s", key)
if b.rateLimiter != nil && !b.rateLimiter.TryAccept() {
glog.V(5).Infof("DEBUG: check of %s exceeded rate limit, will retry later", key)
return
}
namespace, name, _ := cache.SplitMetaNamespaceKey(key.(string))
if err := b.controller.NextTimedByName(namespace, name); err != nil {
// the stream cannot be imported
if err == ErrNotImportable {
// value must match to be removed, so we avoid races against creation by ensuring that we only
// remove the stream if the uid and resource version in the scheduler are exactly the same.
b.scheduler.Remove(key, value)
return
}
utilruntime.HandleError(err)
return
}
}
// Importing is invoked when the controller decides to import a stream in order to push back
// the next schedule time.
func (b *scheduled) Importing(stream *api.ImageStream) {
if !b.enabled {
return
}
glog.V(5).Infof("DEBUG: stream %s was just imported", stream.Name)
// Push the current key back to the end of the queue because it's just been imported
key, _ := cache.MetaNamespaceKeyFunc(stream)
b.scheduler.Delay(key)
}