forked from flant/shell-operator
/
kube_events_manager.go
141 lines (119 loc) · 3.67 KB
/
kube_events_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package kube_events_manager
import (
"context"
"runtime/trace"
"sync"
log "github.com/sirupsen/logrus"
klient "github.com/akaitux/kube-client/client"
. "github.com/akaitux/shell-operator/pkg/kube_events_manager/types"
"github.com/akaitux/shell-operator/pkg/metric_storage"
)
type KubeEventsManager interface {
WithMetricStorage(mstor *metric_storage.MetricStorage)
AddMonitor(monitorConfig *MonitorConfig) error
HasMonitor(monitorID string) bool
GetMonitor(monitorID string) Monitor
StartMonitor(monitorID string)
StopMonitor(monitorID string) error
Ch() chan KubeEvent
PauseHandleEvents()
}
// kubeEventsManager is a main implementation of KubeEventsManager.
type kubeEventsManager struct {
// channel to emit KubeEvent objects
KubeEventCh chan KubeEvent
KubeClient *klient.Client
ctx context.Context
cancel context.CancelFunc
metricStorage *metric_storage.MetricStorage
m sync.RWMutex
Monitors map[string]Monitor
}
// kubeEventsManager should implement KubeEventsManager.
var _ KubeEventsManager = &kubeEventsManager{}
// NewKubeEventsManager returns an implementation of KubeEventsManager.
func NewKubeEventsManager(ctx context.Context, client *klient.Client) *kubeEventsManager {
cctx, cancel := context.WithCancel(ctx)
em := &kubeEventsManager{
ctx: cctx,
cancel: cancel,
KubeClient: client,
m: sync.RWMutex{},
Monitors: make(map[string]Monitor),
KubeEventCh: make(chan KubeEvent, 1),
}
return em
}
func (mgr *kubeEventsManager) WithMetricStorage(mstor *metric_storage.MetricStorage) {
mgr.metricStorage = mstor
}
// AddMonitor creates a monitor with informers and return a KubeEvent with existing objects.
// TODO cleanup informers in case of error
// TODO use Context to stop informers
func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error {
log.Debugf("Add MONITOR %+v", monitorConfig)
monitor := NewMonitor(
mgr.ctx,
mgr.KubeClient,
mgr.metricStorage,
monitorConfig,
func(ev KubeEvent) {
defer trace.StartRegion(context.Background(), "EmitKubeEvent").End()
mgr.KubeEventCh <- ev
})
err := monitor.CreateInformers()
if err != nil {
return err
}
mgr.m.Lock()
mgr.Monitors[monitorConfig.Metadata.MonitorId] = monitor
mgr.m.Unlock()
return nil
}
// HasMonitor returns true if there is a monitor with monitorID.
func (mgr *kubeEventsManager) HasMonitor(monitorID string) bool {
mgr.m.RLock()
_, has := mgr.Monitors[monitorID]
mgr.m.RUnlock()
return has
}
// GetMonitor returns monitor by its ID.
func (mgr *kubeEventsManager) GetMonitor(monitorID string) Monitor {
mgr.m.RLock()
defer mgr.m.RUnlock()
return mgr.Monitors[monitorID]
}
// StartMonitor starts all informers for the monitor.
func (mgr *kubeEventsManager) StartMonitor(monitorID string) {
mgr.m.RLock()
monitor := mgr.Monitors[monitorID]
mgr.m.RUnlock()
monitor.Start(mgr.ctx)
}
// StopMonitor stops monitor and removes it from the index.
func (mgr *kubeEventsManager) StopMonitor(monitorID string) error {
mgr.m.RLock()
monitor, ok := mgr.Monitors[monitorID]
mgr.m.RUnlock()
if ok {
monitor.Stop()
mgr.m.Lock()
delete(mgr.Monitors, monitorID)
mgr.m.Unlock()
}
return nil
}
// Ch returns a channel to receive KubeEvent objects.
func (mgr *kubeEventsManager) Ch() chan KubeEvent {
return mgr.KubeEventCh
}
// PauseHandleEvents set flags for all informers to ignore incoming events.
// Useful for shutdown without panicking.
// Calling cancel() leads to a race and panicking, see https://github.com/kubernetes/kubernetes/issues/59822
func (mgr *kubeEventsManager) PauseHandleEvents() {
mgr.m.RLock()
defer mgr.m.RUnlock()
for _, monitor := range mgr.Monitors {
monitor.PauseHandleEvents()
}
}