-
Notifications
You must be signed in to change notification settings - Fork 2
/
manager.go
137 lines (118 loc) · 3.41 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package threads
import (
"context"
"strings"
"sync"
"github.com/3scale-ops/basereconciler/util"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)
type RunnableThread interface {
GetID() string
SetChannel(chan event.GenericEvent)
Start(context.Context, logr.Logger) error
Stop()
IsStarted() bool
CanBeDeleted() bool
}
// Manager is a struct that holds configuration to
// manage concurrent RunnableThreads
type Manager struct {
mu sync.Mutex
channel chan event.GenericEvent
threads map[string]RunnableThread
}
// NewManager returns a new initialized Manager struct
func NewManager() Manager {
return Manager{
channel: make(chan event.GenericEvent),
threads: map[string]RunnableThread{},
}
}
// runThread runs thread and associates it with a given key so it can later be stopped
func (mgr *Manager) runThread(ctx context.Context, key string, thread RunnableThread, log logr.Logger) error {
thread.SetChannel(mgr.channel)
t, ok := mgr.threads[key]
// do nothing if present and already started
if ok && t.IsStarted() {
return nil
}
mgr.mu.Lock()
defer mgr.mu.Unlock()
if !ok {
mgr.threads[key] = thread
}
if err := mgr.threads[key].Start(ctx, log); err != nil {
return err
}
return nil
}
// stopThread stops the thread identified by the given key
func (mgr *Manager) stopThread(key string) {
mgr.mu.Lock()
if _, ok := mgr.threads[key]; !ok {
return
}
mgr.threads[key].Stop()
delete(mgr.threads, key)
mgr.mu.Unlock()
}
// GetChannel returns the channel through which events can be received
// from the running thread
func (mgr *Manager) GetChannel() <-chan event.GenericEvent {
return mgr.channel
}
// ReconcileThreads ensures that the threads identified by the provided keys are running. prefix() is used to identify
// which threads belong to each resource.
func (mgr *Manager) ReconcileThreads(ctx context.Context, owner client.Object, threads []RunnableThread, log logr.Logger) error {
shouldRun := map[string]int{}
for _, thread := range threads {
key := prefix(owner) + thread.GetID()
shouldRun[key] = 1
if err := mgr.runThread(ctx, key, thread, log); err != nil {
return err
}
}
// Stop threads that should not be running anymore
for key := range mgr.threads {
if strings.Contains(key, prefix(owner)) {
if _, ok := shouldRun[key]; !ok && mgr.threads[key].CanBeDeleted() {
mgr.stopThread(key)
}
}
}
return nil
}
// CleanupThreads returns a function that cleans matching threads when invoked.
// This is intended for use as a cleanup function in the finalize phase of a controller's
// reconcile loop.
func (mgr *Manager) CleanupThreads(owner client.Object) func(context.Context, client.Client) error {
return func(context.Context, client.Client) error {
for key := range mgr.threads {
if strings.Contains(key, prefix(owner)) {
mgr.stopThread(key)
}
}
return nil
}
}
// Returns a thread, typically for inspection by the caller (ie get status/errors)
func (mgr *Manager) GetThread(id string, owner client.Object, log logr.Logger) RunnableThread {
key := prefix(owner) + id
if _, ok := mgr.threads[key]; ok {
return mgr.threads[key]
} else {
return nil
}
}
func (mgr *Manager) GetKeys() []string {
keys := make([]string, 0, len(mgr.threads))
for k, _ := range mgr.threads {
keys = append(keys, k)
}
return keys
}
func prefix(o client.Object) string {
return util.ObjectKey(o).String() + "_"
}