/
volumes_watcher.go
183 lines (149 loc) · 4.56 KB
/
volumes_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package volumewatcher
import (
"context"
"sync"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// Watcher is used to watch volumes and their allocations created
// by the scheduler and trigger the scheduler when allocation health
// transitions.
type Watcher struct {
enabled bool
logger log.Logger
// rpc contains the set of Server methods that can be used by
// the volumes watcher for RPC
rpc CSIVolumeRPC
// the ACL needed to send RPCs
leaderAcl string
// state is the state that is watched for state changes.
state *state.StateStore
// watchers is the set of active watchers, one per volume
watchers map[string]*volumeWatcher
// ctx and exitFn are used to cancel the watcher
ctx context.Context
exitFn context.CancelFunc
wlock sync.RWMutex
}
// NewVolumesWatcher returns a volumes watcher that is used to watch
// volumes and trigger the scheduler as needed.
func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *Watcher {
// the leader step-down calls SetEnabled(false) which is what
// cancels this context, rather than passing in its own shutdown
// context
ctx, exitFn := context.WithCancel(context.Background())
return &Watcher{
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
leaderAcl: leaderAcl,
}
}
// SetEnabled is used to control if the watcher is enabled. The
// watcher should only be enabled on the active leader. When being
// enabled the state is passed in as it is no longer valid once a
// leader election has taken place.
func (w *Watcher) SetEnabled(enabled bool, state *state.StateStore) {
w.wlock.Lock()
defer w.wlock.Unlock()
wasEnabled := w.enabled
w.enabled = enabled
if state != nil {
w.state = state
}
// Flush the state to create the necessary objects
w.flush(enabled)
// If we are starting now, launch the watch daemon
if enabled && !wasEnabled {
go w.watchVolumes(w.ctx)
}
}
// flush is used to clear the state of the watcher
func (w *Watcher) flush(enabled bool) {
// Stop all the watchers and clear it
for _, watcher := range w.watchers {
watcher.Stop()
}
// Kill everything associated with the watcher
if w.exitFn != nil {
w.exitFn()
}
w.watchers = make(map[string]*volumeWatcher, 32)
w.ctx, w.exitFn = context.WithCancel(context.Background())
}
// watchVolumes is the long lived go-routine that watches for volumes to
// add and remove watchers on.
func (w *Watcher) watchVolumes(ctx context.Context) {
vIndex := uint64(1)
for {
volumes, idx, err := w.getVolumes(ctx, vIndex)
if err != nil {
if err == context.Canceled {
return
}
w.logger.Error("failed to retrieve volumes", "error", err)
}
vIndex = idx // last-seen index
for _, v := range volumes {
if err := w.add(v); err != nil {
w.logger.Error("failed to track volume", "volume_id", v.ID, "error", err)
}
}
}
}
// getVolumes retrieves all volumes blocking at the given index.
func (w *Watcher) getVolumes(ctx context.Context, minIndex uint64) ([]*structs.CSIVolume, uint64, error) {
resp, index, err := w.state.BlockingQuery(w.getVolumesImpl, minIndex, ctx)
if err != nil {
return nil, 0, err
}
return resp.([]*structs.CSIVolume), index, nil
}
// getVolumesImpl retrieves all volumes from the passed state store.
func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) {
iter, err := state.CSIVolumes(ws)
if err != nil {
return nil, 0, err
}
var volumes []*structs.CSIVolume
for {
raw := iter.Next()
if raw == nil {
break
}
volume := raw.(*structs.CSIVolume)
volumes = append(volumes, volume)
}
// Use the last index that affected the volume table
index, err := state.Index("csi_volumes")
if err != nil {
return nil, 0, err
}
return volumes, index, nil
}
// add adds a volume to the watch list
func (w *Watcher) add(d *structs.CSIVolume) error {
w.wlock.Lock()
defer w.wlock.Unlock()
_, err := w.addLocked(d)
return err
}
// addLocked adds a volume to the watch list and should only be called when
// locked. Creating the volumeWatcher starts a go routine to .watch() it
func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {
// Not enabled so no-op
if !w.enabled {
return nil, nil
}
// Already watched so trigger an update for the volume
if watcher, ok := w.watchers[v.ID+v.Namespace]; ok {
watcher.Notify(v)
return nil, nil
}
watcher := newVolumeWatcher(w, v)
w.watchers[v.ID+v.Namespace] = watcher
return watcher, nil
}