forked from weaveworks/scope
/
collector.go
338 lines (303 loc) · 9.05 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
package app
import (
"compress/gzip"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"
"github.com/weaveworks/common/mtime"
"github.com/weaveworks/scope/report"
)
// We merge all reports received within the specified interval, and
// discard the orignals. Higher figures improve the performance of
// Report(), but at the expense of lower time resolution, since time
// is effectively advancing in quantiles.
//
// The current figure is identical to the default
// probe.publishInterval, which results in performance improvements
// as soon as there is more than one probe.
const reportQuantisationInterval = 3 * time.Second
// Reporter is something that can produce reports on demand. It's a convenient
// interface for parts of the app, and several experimental components.
type Reporter interface {
Report(context.Context) (report.Report, error)
WaitOn(context.Context, chan struct{})
UnWait(context.Context, chan struct{})
}
// Adder is something that can accept reports. It's a convenient interface for
// parts of the app, and several experimental components. It takes the following
// arguments:
// - context.Context: the request context
// - report.Report: the deserialised report
// - []byte: the serialised report (as gzip'd msgpack)
type Adder interface {
Add(context.Context, report.Report, []byte) error
}
// A Collector is a Reporter and an Adder
type Collector interface {
Reporter
Adder
}
// Collector receives published reports from multiple producers. It yields a
// single merged report, representing all collected reports.
type collector struct {
mtx sync.Mutex
reports []report.Report
timestamps []time.Time
window time.Duration
cached *report.Report
merger Merger
waitableCondition
}
type waitableCondition struct {
sync.Mutex
waiters map[chan struct{}]struct{}
}
func (wc *waitableCondition) WaitOn(_ context.Context, waiter chan struct{}) {
wc.Lock()
wc.waiters[waiter] = struct{}{}
wc.Unlock()
}
func (wc *waitableCondition) UnWait(_ context.Context, waiter chan struct{}) {
wc.Lock()
delete(wc.waiters, waiter)
wc.Unlock()
}
func (wc *waitableCondition) Broadcast() {
wc.Lock()
for waiter := range wc.waiters {
// Non-block write to channel
select {
case waiter <- struct{}{}:
default:
}
}
wc.Unlock()
}
// NewCollector returns a collector ready for use.
func NewCollector(window time.Duration) Collector {
return &collector{
window: window,
waitableCondition: waitableCondition{
waiters: map[chan struct{}]struct{}{},
},
merger: NewSmartMerger(),
}
}
// Add adds a report to the collector's internal state. It implements Adder.
func (c *collector) Add(_ context.Context, rpt report.Report, _ []byte) error {
c.mtx.Lock()
defer c.mtx.Unlock()
c.reports = append(c.reports, rpt)
c.timestamps = append(c.timestamps, mtime.Now())
c.clean()
c.cached = nil
if rpt.Shortcut {
c.Broadcast()
}
return nil
}
// Report returns a merged report over all added reports. It implements
// Reporter.
func (c *collector) Report(_ context.Context) (report.Report, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
// If the oldest report is still within range,
// and there is a cached report, return that.
if c.cached != nil && len(c.reports) > 0 {
oldest := mtime.Now().Add(-c.window)
if c.timestamps[0].After(oldest) {
return *c.cached, nil
}
}
c.clean()
c.quantise()
rpt := c.merger.Merge(c.reports).Upgrade()
c.cached = &rpt
return rpt, nil
}
// remove reports older than the app.window
func (c *collector) clean() {
var (
cleanedReports = make([]report.Report, 0, len(c.reports))
cleanedTimestamps = make([]time.Time, 0, len(c.timestamps))
oldest = mtime.Now().Add(-c.window)
)
for i, r := range c.reports {
if c.timestamps[i].After(oldest) {
cleanedReports = append(cleanedReports, r)
cleanedTimestamps = append(cleanedTimestamps, c.timestamps[i])
}
}
c.reports = cleanedReports
c.timestamps = cleanedTimestamps
}
// Merge reports received within the same reportQuantisationInterval.
//
// Quantisation is relative to the time of the first report in a given
// interval, rather than absolute time. So, for example, with a
// reportQuantisationInterval of 3s and reports with timestamps [0, 1,
// 2, 5, 6, 7], the result contains merged reports with
// timestamps/content of [0:{0,1,2}, 5:{5,6,7}].
func (c *collector) quantise() {
if len(c.reports) == 0 {
return
}
var (
quantisedReports = make([]report.Report, 0, len(c.reports))
quantisedTimestamps = make([]time.Time, 0, len(c.timestamps))
)
quantumStartIdx := 0
quantumStartTimestamp := c.timestamps[0]
for i, t := range c.timestamps {
if t.Sub(quantumStartTimestamp) < reportQuantisationInterval {
continue
}
quantisedReports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:i]))
quantisedTimestamps = append(quantisedTimestamps, quantumStartTimestamp)
quantumStartIdx = i
quantumStartTimestamp = t
}
c.reports = append(quantisedReports, c.merger.Merge(c.reports[quantumStartIdx:]))
c.timestamps = append(quantisedTimestamps, c.timestamps[quantumStartIdx])
}
// StaticCollector always returns the given report.
type StaticCollector report.Report
// Report returns a merged report over all added reports. It implements
// Reporter.
func (c StaticCollector) Report(context.Context) (report.Report, error) { return report.Report(c), nil }
// Add adds a report to the collector's internal state. It implements Adder.
func (c StaticCollector) Add(context.Context, report.Report, []byte) error { return nil }
// WaitOn lets other components wait on a new report being received. It
// implements Reporter.
func (c StaticCollector) WaitOn(context.Context, chan struct{}) {}
// UnWait lets other components stop waiting on a new report being received. It
// implements Reporter.
func (c StaticCollector) UnWait(context.Context, chan struct{}) {}
// NewFileCollector reads and parses the files at path (a file or
// directory) as reports. If there are multiple files, and they all
// have names representing "nanoseconds since epoch" timestamps,
// e.g. "1488557088545489008.msgpack.gz", then the collector will
// return merged reports resulting from replaying the file reports in
// a loop at a sequence and speed determined by the timestamps.
// Otherwise the collector always returns the merger of all reports.
func NewFileCollector(path string, window time.Duration) (Collector, error) {
var (
timestamps []time.Time
reports []report.Report
)
allTimestamped := true
if err := filepath.Walk(path,
func(p string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
t, err := timestampFromFilepath(p)
if err != nil {
allTimestamped = false
}
timestamps = append(timestamps, t)
rpt, err := readReport(p)
if err != nil {
return err
}
reports = append(reports, rpt)
return nil
}); err != nil {
return nil, err
}
if len(reports) > 1 && allTimestamped {
collector := NewCollector(window)
go replay(collector, timestamps, reports)
return collector, nil
}
return StaticCollector(NewSmartMerger().Merge(reports).Upgrade()), nil
}
func timestampFromFilepath(path string) (time.Time, error) {
name := filepath.Base(path)
for {
ext := filepath.Ext(name)
if ext == "" {
break
}
name = strings.TrimSuffix(name, ext)
}
nanosecondsSinceEpoch, err := strconv.ParseInt(name, 10, 64)
if err != nil {
return time.Time{}, fmt.Errorf("filename '%s' is not a number (representing nanoseconds since epoch): %v", name, err)
}
return time.Unix(0, nanosecondsSinceEpoch), nil
}
func readReport(path string) (rpt report.Report, _ error) {
f, err := os.Open(path)
if err != nil {
return rpt, err
}
defer f.Close()
var (
handle codec.Handle
gzipped bool
)
fileType := filepath.Ext(path)
if fileType == ".gz" {
gzipped = true
fileType = filepath.Ext(strings.TrimSuffix(path, fileType))
}
switch fileType {
case ".json":
handle = &codec.JsonHandle{}
case ".msgpack":
handle = &codec.MsgpackHandle{}
default:
return rpt, fmt.Errorf("Unsupported file extension: %v", fileType)
}
var buf []byte
if gzipped {
r, err := gzip.NewReader(f)
if err != nil {
return rpt, err
}
buf, err = ioutil.ReadAll(r)
} else {
buf, err = ioutil.ReadAll(f)
}
if err != nil {
return rpt, err
}
err = rpt.ReadBytes(buf, handle)
return rpt, err
}
func replay(a Adder, timestamps []time.Time, reports []report.Report) {
// calculate delays between report n and n+1
l := len(timestamps)
delays := make([]time.Duration, l, l)
for i, t := range timestamps[0 : l-1] {
delays[i] = timestamps[i+1].Sub(t)
if delays[i] < 0 {
panic(fmt.Errorf("replay timestamps are not in order! %v", timestamps))
}
}
// We don't know how long to wait before looping round, so make a
// good guess.
delays[l-1] = timestamps[l-1].Sub(timestamps[0]) / time.Duration(l)
due := time.Now()
for {
for i, r := range reports {
a.Add(nil, r, nil)
due = due.Add(delays[i])
delay := due.Sub(time.Now())
if delay > 0 {
time.Sleep(delay)
}
}
}
}