mirrored from https://skia.googlesource.com/buildbot
/
dataframe.go
306 lines (280 loc) · 9.69 KB
/
dataframe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
// Package dataframe provides DataFrame which is a TraceSet with a calculated
// ParamSet and associated commit info.
package dataframe
import (
"context"
"fmt"
"sort"
"time"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/timer"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/perf/go/cid"
"go.skia.org/infra/perf/go/types"
)
const (
// DEFAULT_NUM_COMMITS is the number of commits in the DataFrame returned
// from New().
DEFAULT_NUM_COMMITS = 50
MAX_SAMPLE_SIZE = 256
)
// DataFrameBuilder is an interface for things that construct DataFrames.
type DataFrameBuilder interface {
// New returns a populated DataFrame of the last 50 commits or a non-nil
// error if there was a failure retrieving the traces.
New(progress types.Progress) (*DataFrame, error)
// NewN returns a populated DataFrame of the last N commits or a non-nil
// error if there was a failure retrieving the traces.
NewN(progress types.Progress, n int) (*DataFrame, error)
// NewFromQueryAndRange returns a populated DataFrame of the traces that match
// the given time range [begin, end) and the passed in query, or a non-nil
// error if the traces can't be retrieved. The 'progress' callback is called
// periodically as the query is processed.
NewFromQueryAndRange(begin, end time.Time, q *query.Query, downsample bool, progress types.Progress) (*DataFrame, error)
// NewFromKeysAndRange returns a populated DataFrame of the traces that match
// the given set of 'keys' over the range of [begin, end). The 'progress'
// callback is called periodically as the query is processed.
NewFromKeysAndRange(keys []string, begin, end time.Time, downsample bool, progress types.Progress) (*DataFrame, error)
// NewFromCommitIDsAndQuery returns a populated DataFrame of the traces that
// match the given time set of commits 'cids' and the query 'q'. The 'progress'
// callback is called periodically as the query is processed.
NewFromCommitIDsAndQuery(ctx context.Context, cids []*cid.CommitID, cidl *cid.CommitIDLookup, q *query.Query, progress types.Progress) (*DataFrame, error)
// NewNFromQuery returns a populated DataFrame of condensed traces of N data
// points ending at the given 'end' time that match the given query.
NewNFromQuery(ctx context.Context, end time.Time, q *query.Query, n int32, progress types.Progress) (*DataFrame, error)
// NewNFromQuery returns a populated DataFrame of condensed traces of N data
// points ending at the given 'end' time for the given keys.
NewNFromKeys(ctx context.Context, end time.Time, keys []string, n int32, progress types.Progress) (*DataFrame, error)
// TODO Add func to count matches.
// TODO Add func to get merged paramset for a date range.
}
// ColumnHeader describes each column in a DataFrame.
type ColumnHeader struct {
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"` // In seconds from the Unix epoch.
}
// DataFrame stores Perf measurements in a table where each row is a Trace
// indexed by a structured key (see go/query), and each column is described by
// a ColumnHeader, which could be a commit or a trybot patch level.
//
// Skip is the number of commits skipped to bring the DataFrame down
// to less than MAX_SAMPLE_SIZE commits. If Skip is zero then no
// commits were skipped.
//
// The name DataFrame was gratuitously borrowed from R.
type DataFrame struct {
TraceSet types.TraceSet `json:"traceset"`
Header []*ColumnHeader `json:"header"`
ParamSet paramtools.ParamSet `json:"paramset"`
Skip int `json:"skip"`
}
// BuildParamSet rebuilds d.ParamSet from the keys of d.TraceSet.
func (d *DataFrame) BuildParamSet() {
paramSet := paramtools.ParamSet{}
for key := range d.TraceSet {
paramSet.AddParamsFromKey(key)
}
for _, values := range paramSet {
sort.Strings(values)
}
paramSet.Normalize()
d.ParamSet = paramSet
}
func simpleMap(n int) map[int]int {
ret := map[int]int{}
for i := 0; i < n; i += 1 {
ret[i] = i
}
return ret
}
// merge creates a merged header from the two given headers.
//
// I.e. {1,4,5} + {3,4} => {1,3,4,5}
func merge(a, b []*ColumnHeader) ([]*ColumnHeader, map[int]int, map[int]int) {
if len(a) == 0 {
return b, simpleMap(0), simpleMap(len(b))
} else if len(b) == 0 {
return a, simpleMap(len(a)), simpleMap(0)
}
aMap := map[int]int{}
bMap := map[int]int{}
numA := len(a)
numB := len(b)
pA := 0
pB := 0
ret := []*ColumnHeader{}
for {
if pA == numA && pB == numB {
break
}
if pA == numA {
// Copy in the rest of b.
for i := pB; i < numB; i++ {
bMap[i] = len(ret)
ret = append(ret, b[i])
}
break
}
if pB == numB {
// Copy in the rest of a.
for i := pA; i < numA; i++ {
aMap[i] = len(ret)
ret = append(ret, a[i])
}
}
if a[pA].Offset < b[pB].Offset {
aMap[pA] = len(ret)
ret = append(ret, a[pA])
pA += 1
} else if a[pA].Offset > b[pB].Offset {
bMap[pB] = len(ret)
ret = append(ret, b[pB])
pB += 1
} else {
aMap[pA] = len(ret)
bMap[pB] = len(ret)
ret = append(ret, a[pA])
pA += 1
pB += 1
}
}
return ret, aMap, bMap
}
// Join create a new DataFrame that is the union of 'a' and 'b'.
//
// Will handle the case of a and b having data for different sets of commits,
// i.e. a.Header doesn't have to equal b.Header.
func Join(a, b *DataFrame) *DataFrame {
ret := NewEmpty()
// Build a merged set of headers.
header, aMap, bMap := merge(a.Header, b.Header)
ret.Header = header
if len(a.Header) == 0 {
a.Header = b.Header
}
ret.Skip = b.Skip
ret.ParamSet.AddParamSet(a.ParamSet)
ret.ParamSet.AddParamSet(b.ParamSet)
traceLen := len(ret.Header)
for key, sourceTrace := range a.TraceSet {
if _, ok := ret.TraceSet[key]; !ok {
ret.TraceSet[key] = types.NewTrace(traceLen)
}
destTrace := ret.TraceSet[key]
for sourceOffset, sourceValue := range sourceTrace {
destTrace[aMap[sourceOffset]] = sourceValue
}
}
for key, sourceTrace := range b.TraceSet {
if _, ok := ret.TraceSet[key]; !ok {
ret.TraceSet[key] = types.NewTrace(traceLen)
}
destTrace := ret.TraceSet[key]
for sourceOffset, sourceValue := range sourceTrace {
destTrace[bMap[sourceOffset]] = sourceValue
}
}
return ret
}
// TraceFilter is a function type that should return true if trace 'tr' should
// be removed from a DataFrame. It is used in FilterOut.
type TraceFilter func(tr types.Trace) bool
// FilterOut removes traces from d.TraceSet if the filter function 'f' returns
// true for a trace.
//
// FilterOut rebuilds the ParamSet to match the new set of traces once
// filtering is complete.
func (d *DataFrame) FilterOut(f TraceFilter) {
for key, tr := range d.TraceSet {
if f(tr) {
delete(d.TraceSet, key)
}
}
d.BuildParamSet()
}
// Slice returns a dataframe that contains a subset of the current dataframe,
// starting from 'offset', the next 'size' num points will be returned as a new
// dataframe. Note that the data is composed of slices of the original data,
// not copies, so the returned dataframe must not be altered.
func (d *DataFrame) Slice(offset, size int) (*DataFrame, error) {
if offset+size > len(d.Header) {
return nil, fmt.Errorf("Slize exceeds current dataframe bounds.")
}
ret := NewEmpty()
ret.Header = d.Header[offset : offset+size]
for key, tr := range d.TraceSet {
ret.TraceSet[key] = tr[offset : offset+size]
}
ret.BuildParamSet()
return ret, nil
}
// rangeImpl returns the slices of ColumnHeader and cid.CommitID that are
// needed by DataFrame. The slices are populated from the given
// vcsinfo.IndexCommits.
//
// The value for 'skip', the number of commits skipped, is passed through to
// the return values.
func rangeImpl(resp []*vcsinfo.IndexCommit, skip int) ([]*ColumnHeader, []*cid.CommitID, int) {
headers := []*ColumnHeader{}
commits := []*cid.CommitID{}
for _, r := range resp {
commits = append(commits, &cid.CommitID{
Offset: r.Index,
Source: "master",
})
headers = append(headers, &ColumnHeader{
Source: "master",
Offset: int64(r.Index),
Timestamp: r.Timestamp.Unix(),
})
}
return headers, commits, skip
}
// lastN returns the slices of ColumnHeader and cid.CommitID that are
// needed by DataFrame. The slices are for the last N commits in the repo.
//
// Returns 0 for 'skip', the number of commits skipped.
func lastN(vcs vcsinfo.VCS, n int) ([]*ColumnHeader, []*cid.CommitID, int) {
return rangeImpl(vcs.LastNIndex(n), 0)
}
// getRange returns the slices of ColumnHeader and cid.CommitID that are needed
// by DataFrame. The slices are for the commits that fall in the given time
// range [begin, end).
//
// If 'downsample' is true then the number of commits returned is limited
// to MAX_SAMPLE_SIZE.
//
// The value for 'skip', the number of commits skipped, is also returned.
func getRange(vcs vcsinfo.VCS, begin, end time.Time, downsample bool) ([]*ColumnHeader, []*cid.CommitID, int) {
commits := vcs.Range(begin, end)
skip := 0
if downsample {
commits, skip = DownSample(vcs.Range(begin, end), MAX_SAMPLE_SIZE)
}
return rangeImpl(commits, skip)
}
// NewEmpty returns a new empty DataFrame.
func NewEmpty() *DataFrame {
return &DataFrame{
TraceSet: types.TraceSet{},
Header: []*ColumnHeader{},
ParamSet: paramtools.ParamSet{},
}
}
// NewHeaderOnly returns a DataFrame with a populated Header, with no traces.
// The 'progress' callback is called periodically as the query is processed.
//
// If 'downsample' is true then the number of commits returned is limited
// to MAX_SAMPLE_SIZE.
func NewHeaderOnly(vcs vcsinfo.VCS, begin, end time.Time, downsample bool) *DataFrame {
defer timer.New("NewHeaderOnly time").Stop()
colHeaders, _, skip := getRange(vcs, begin, end, downsample)
return &DataFrame{
TraceSet: types.TraceSet{},
Header: colHeaders,
ParamSet: paramtools.ParamSet{},
Skip: skip,
}
}