/
controller.go
140 lines (113 loc) · 3.33 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package controller
import (
"fmt"
"time"
"github.com/arunprasadmudaliar/chronos/pkg/utils"
"github.com/sirupsen/logrus"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type event struct {
key string
eventType string
resourceType string
}
type controller struct {
client kubernetes.Interface
informer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
}
func Start(config string) {
kc, err := utils.GetClient(config)
if err != nil {
logrus.Fatal(err)
}
factory := informers.NewSharedInformerFactory(kc, 0)
informer := factory.Core().V1().Pods().Informer()
c := newController(kc, informer)
stopCh := make(chan struct{})
defer close(stopCh)
c.Run(stopCh)
}
func newController(kc kubernetes.Interface, informer cache.SharedIndexInformer) *controller {
q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
var event event
var err error
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
event.key, err = cache.MetaNamespaceKeyFunc(obj)
event.eventType = "create"
if err == nil {
q.Add(event)
}
logrus.Infof("Event received of type [%s] for [%s]", event.eventType, event.key)
},
UpdateFunc: func(old, new interface{}) {
event.key, err = cache.MetaNamespaceKeyFunc(old)
event.eventType = "update"
if err == nil {
q.Add(event)
}
logrus.Infof("Event received of type [%s] for [%s]", event.eventType, event.key)
},
DeleteFunc: func(obj interface{}) {
event.key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
event.eventType = "delete"
if err == nil {
q.Add(event)
}
logrus.Infof("Event received of type [%s] for [%s]", event.eventType, event.key)
},
})
return &controller{
client: kc,
informer: informer,
queue: q,
}
}
func (c *controller) Run(stopper <-chan struct{}) {
defer utilruntime.HandleCrash() //this will handle panic and won't crash the process
defer c.queue.ShutDown() //shutdown all workqueue and terminate all workers
logrus.Info("Starting Chronos...")
go c.informer.Run(stopper)
logrus.Info("Synchronizing events...")
//synchronize the cache before starting to process events
if !cache.WaitForCacheSync(stopper, c.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
logrus.Info("synchronization failed...")
return
}
logrus.Info("synchronization complete!")
logrus.Info("Ready to process events")
wait.Until(c.runWorker, time.Second, stopper)
}
func (c *controller) runWorker() {
for c.processNextItem() {
// continue looping
}
}
func (c *controller) processNextItem() bool {
e, term := c.queue.Get()
if term {
return false
}
err := c.processItem(e.(event))
if err == nil {
c.queue.Forget(e)
return true
}
return true
}
func (c *controller) processItem(e event) error {
obj, _, err := c.informer.GetIndexer().GetByKey(e.key)
if err != nil {
return fmt.Errorf("Error fetching object with key %s from store: %v", e.key, err)
}
//Use a switch clause instead and process the events based on the type
logrus.Infof("Chronos has processed 1 event of type [%s] for object [%s]", e.eventType, obj)
return nil
}