forked from kubesphere/kubesphere
-
Notifications
You must be signed in to change notification settings - Fork 0
/
basecontroller.go
127 lines (109 loc) · 3.92 KB
/
basecontroller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
// BaseController provides a Controller template for watching a primary resources that defined as CRD.
type BaseController struct {
// Workers will wait informer caches to be synced
Synced []cache.InformerSynced
// Workqueue is a rate limited work queue.
Workqueue workqueue.RateLimitingInterface
Handler func(key string) error
MaxRetries int
Name string
}
// Run will set up the event handlers for Primary resource, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *BaseController) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.Workqueue.ShutDown()
klog.Infof("Starting controller: %s", c.Name)
klog.Infof("Waiting for informer caches to sync for: %s", c.Name)
if ok := cache.WaitForCacheSync(stopCh, c.Synced...); !ok {
return fmt.Errorf("failed to wait for caches to sync for: %s", c.Name)
}
klog.Infof("Starting workers for: %s", c.Name)
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Infof("Started workers for: %s", c.Name)
<-stopCh
klog.Infof("Shutting down workers for: %s", c.Name)
return nil
}
// Enqueue takes a primary resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than primary resource.
func (c *BaseController) Enqueue(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.Workqueue.Add(key)
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *BaseController) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the Handler.
func (c *BaseController) processNextWorkItem() bool {
obj, shutdown := c.Workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.Workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.Workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in Workqueue but got %#v in %s", obj, c.Name))
return nil
}
if err := c.Handler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors,
// when the max retries haven't reached or there is no retry times limit.
if c.MaxRetries == 0 || c.Workqueue.NumRequeues(key) < c.MaxRetries {
c.Workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s' in %s: %s, requeuing ", key, c.Name, err.Error())
}
klog.V(4).Infof("Dropping %s out of the queue in %s: %s", key, c.Name, err)
utilruntime.HandleError(err)
return nil
}
c.Workqueue.Forget(obj)
klog.Infof("Successfully Synced %s:%s in %s", "key", key, c.Name)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}