-
Notifications
You must be signed in to change notification settings - Fork 597
/
tracker.go
234 lines (214 loc) · 6.87 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package workspaceusage
import (
"bytes"
"context"
"flag"
"os"
"sort"
"sync"
"time"
"github.com/google/uuid"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
)
var DefaultFlushInterval = 60 * time.Second
// Store is a subset of database.Store
type Store interface {
BatchUpdateWorkspaceLastUsedAt(context.Context, database.BatchUpdateWorkspaceLastUsedAtParams) error
}
// Tracker tracks and de-bounces updates to workspace usage activity.
// It keeps an internal map of workspace IDs that have been used and
// periodically flushes this to its configured Store.
type Tracker struct {
log slog.Logger // you know, for logs
flushLock sync.Mutex // protects m
flushErrors int // tracks the number of consecutive errors flushing
m *uuidSet // stores workspace ids
s Store // for flushing data
tickCh <-chan time.Time // controls flush interval
stopTick func() // stops flushing
stopCh chan struct{} // signals us to stop
stopOnce sync.Once // because you only stop once
doneCh chan struct{} // signifies that we have stopped
flushCh chan int // used for testing.
}
// New returns a new Tracker. It is the caller's responsibility
// to call Close().
func New(s Store, opts ...Option) *Tracker {
tr := &Tracker{
log: slog.Make(sloghuman.Sink(os.Stderr)),
m: &uuidSet{},
s: s,
tickCh: nil,
stopTick: nil,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
flushCh: nil,
}
for _, opt := range opts {
opt(tr)
}
if tr.tickCh == nil && tr.stopTick == nil {
tick := time.NewTicker(DefaultFlushInterval)
tr.tickCh = tick.C
tr.stopTick = tick.Stop
}
go tr.loop()
return tr
}
type Option func(*Tracker)
// WithLogger sets the logger to be used by Tracker.
func WithLogger(log slog.Logger) Option {
return func(h *Tracker) {
h.log = log
}
}
// WithFlushInterval allows configuring the flush interval of Tracker.
func WithFlushInterval(d time.Duration) Option {
return func(h *Tracker) {
ticker := time.NewTicker(d)
h.tickCh = ticker.C
h.stopTick = ticker.Stop
}
}
// WithTickFlush allows passing two channels: one that reads
// a time.Time, and one that returns the number of marked workspaces
// every time Tracker flushes.
// For testing only and will panic if used outside of tests.
func WithTickFlush(tickCh <-chan time.Time, flushCh chan int) Option {
if flag.Lookup("test.v") == nil {
panic("developer error: WithTickFlush is not to be used outside of tests.")
}
return func(h *Tracker) {
h.tickCh = tickCh
h.stopTick = func() {}
h.flushCh = flushCh
}
}
// Add marks the workspace with the given ID as having been used recently.
// Tracker will periodically flush this to its configured Store.
func (tr *Tracker) Add(workspaceID uuid.UUID) {
tr.m.Add(workspaceID)
}
// flush updates last_used_at of all current workspace IDs.
// If this is held while a previous flush is in progress, it will
// deadlock until the previous flush has completed.
func (tr *Tracker) flush(now time.Time) {
// Copy our current set of IDs
ids := tr.m.UniqueAndClear()
count := len(ids)
if tr.flushCh != nil { // only used for testing
defer func() {
tr.flushCh <- count
}()
}
if count == 0 {
tr.log.Debug(context.Background(), "nothing to flush")
return
}
// Set a short-ish timeout for this. We don't want to hang forever.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// nolint: gocritic // system function
authCtx := dbauthz.AsSystemRestricted(ctx)
tr.flushLock.Lock()
defer tr.flushLock.Unlock()
if err := tr.s.BatchUpdateWorkspaceLastUsedAt(authCtx, database.BatchUpdateWorkspaceLastUsedAtParams{
LastUsedAt: now,
IDs: ids,
}); err != nil {
// A single failure to flush is likely not a huge problem. If the workspace is still connected at
// the next iteration, either another coderd instance will likely have this data or the CLI
// will tell us again that the workspace is in use.
tr.flushErrors++
if tr.flushErrors > 1 {
tr.log.Error(ctx, "multiple failures updating workspaces last_used_at", slog.F("count", count), slog.F("consecutive_errors", tr.flushErrors), slog.Error(err))
// TODO: if this keeps failing, it indicates a fundamental problem with the database connection.
// How to surface it correctly to admins besides just screaming into the logs?
} else {
tr.log.Warn(ctx, "failed updating workspaces last_used_at", slog.F("count", count), slog.Error(err))
}
return
}
tr.flushErrors = 0
tr.log.Info(ctx, "updated workspaces last_used_at", slog.F("count", count), slog.F("now", now))
}
// loop periodically flushes every tick.
// If loop is called after Close, it will exit immediately and log an error.
func (tr *Tracker) loop() {
select {
case <-tr.doneCh:
tr.log.Error(context.Background(), "developer error: Loop called after Close")
return
default:
}
defer func() {
close(tr.doneCh)
tr.log.Debug(context.Background(), "workspace usage tracker loop exited")
}()
for {
select {
case <-tr.stopCh:
return
case now, ok := <-tr.tickCh:
if !ok {
return
}
// NOTE: we do not update last_used_at with the time at which each workspace was added.
// Instead, we update with the time of the flush. If the BatchUpdateWorkspacesLastUsedAt
// query can be rewritten to update each id with a corresponding last_used_at timestamp
// then we could capture the exact usage time of each workspace. For now however, as
// we perform this query at a regular interval, the time of the flush is 'close enough'
// for the purposes of both dormancy (and for autostop, in future).
tr.flush(now.UTC())
}
}
}
// Close stops Tracker and returns once Loop has exited.
// After calling Close(), Loop must not be called.
func (tr *Tracker) Close() error {
tr.stopOnce.Do(func() {
tr.stopCh <- struct{}{}
tr.stopTick()
<-tr.doneCh
})
return nil
}
// uuidSet is a set of UUIDs. Safe for concurrent usage.
// The zero value can be used.
type uuidSet struct {
l sync.Mutex
m map[uuid.UUID]struct{}
}
func (s *uuidSet) Add(id uuid.UUID) {
s.l.Lock()
defer s.l.Unlock()
if s.m == nil {
s.m = make(map[uuid.UUID]struct{})
}
s.m[id] = struct{}{}
}
// UniqueAndClear returns the unique set of entries in s and
// resets the internal map.
func (s *uuidSet) UniqueAndClear() []uuid.UUID {
s.l.Lock()
defer s.l.Unlock()
if s.m == nil {
s.m = make(map[uuid.UUID]struct{})
return []uuid.UUID{}
}
l := make([]uuid.UUID, 0)
for k := range s.m {
l = append(l, k)
}
// For ease of testing, sort the IDs lexically
sort.Slice(l, func(i, j int) bool {
// For some unfathomable reason, byte arrays are not comparable?
// See https://github.com/golang/go/issues/61004
return bytes.Compare(l[i][:], l[j][:]) < 0
})
clear(s.m)
return l
}