-
Notifications
You must be signed in to change notification settings - Fork 130
/
platform_load.go
153 lines (134 loc) · 4.77 KB
/
platform_load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package metrics
import (
"fmt"
"log"
"strings"
"time"
"github.com/bazelbuild/continuous-integration/metrics/clients"
"github.com/bazelbuild/continuous-integration/metrics/data"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredres "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
const baseMetricType = "custom.googleapis.com/bazel/ci"
type PlatformLoad struct {
client clients.BuildkiteClient
orgs []string
columns []Column
builds int
}
func (pl *PlatformLoad) Name() string {
return "platform_load"
}
func (pl *PlatformLoad) Columns() []Column {
return pl.columns
}
func (*PlatformLoad) Type() MetricType {
return TimeBasedMetric
}
func (*PlatformLoad) RelevantDelta() int {
return 2 * 24 * 60 * 60 // Two days in seconds
}
func (pl *PlatformLoad) Collect() (data.DataSet, error) {
result := &loadDataSet{headers: GetColumnNames(pl.columns), ts: time.Now(), rows: make([]*loadDataRow, 0)}
for _, org := range pl.orgs {
pid := &data.PipelineID{Org: org, Slug: "all"}
builds, err := pl.client.GetMostRecentBuilds(pid, pl.builds)
if err != nil {
return nil, fmt.Errorf("Cannot get builds to determine platform load: %v", err)
}
allPlatforms := make(map[string]bool)
waiting := make(map[string]int)
running := make(map[string]int)
for _, build := range builds {
for _, job := range build.Jobs {
// Do not use getPlatform() since it may return "rbe", but here we're only interested in the actual worker OS (which would be "linux" in the rbe case).
platform := getPlatformFromAgentQueryRules(job.AgentQueryRules)
if platform == "" || job.CreatedAt == nil || job.FinishedAt != nil {
continue
}
allPlatforms[platform] = true
switch *job.State {
case "running":
running[platform] += 1
case "scheduled", "runnable":
/*
State "scheduled" / "runnable" = waiting for a worker to become available
State "waiting" / "waiting_failed" = waiting for another task to finish
We're only interested in "scheduled" and "runnable" jobs since they may indicate a shortage of workers.
*/
waiting[platform] += 1
}
}
}
for platform := range allPlatforms {
row := &loadDataRow{org: org, platform: platform, waitingJobs: waiting[platform], runningJobs: running[platform]}
result.rows = append(result.rows, row)
}
}
return result, nil
}
// CREATE TABLE platform_load (timestamp DATETIME, org VARCHAR(255), platform VARCHAR(255), waiting_jobs INT, running_jobs INT, PRIMARY KEY(org, timestamp, platform));
func CreatePlatformLoad(client clients.BuildkiteClient, builds int, orgs ...string) *PlatformLoad {
columns := []Column{Column{"timestamp", true}, Column{"org", true}, Column{"platform", true}, Column{"waiting_jobs", false}, Column{"running_jobs", false}}
return &PlatformLoad{client: client, orgs: orgs, columns: columns, builds: builds}
}
type loadDataRow struct {
org string
platform string
waitingJobs int
runningJobs int
}
type loadDataSet struct {
headers []string
ts time.Time
rows []*loadDataRow
}
func (lds *loadDataSet) GetData() *data.LegacyDataSet {
rawSet := data.CreateDataSet(lds.headers)
for _, row := range lds.rows {
rawRow := []interface{}{lds.ts, row.org, row.platform, row.waitingJobs, row.runningJobs}
rawSet.Data = append(rawSet.Data, rawRow)
}
return rawSet
}
func (lds *loadDataSet) CreateTimeSeriesRequest(projectID string) *monitoringpb.CreateTimeSeriesRequest {
ts := ×tamp.Timestamp{
Seconds: lds.ts.Unix(),
}
series := make([]*monitoringpb.TimeSeries, len(lds.rows)*3)
for i, row := range lds.rows {
series[3*i] = createTimeSeries(ts, row.org, row.platform, "waiting_jobs", row.waitingJobs)
series[3*i+1] = createTimeSeries(ts, row.org, row.platform, "running_jobs", row.runningJobs)
series[3*i+2] = createTimeSeries(ts, row.org, row.platform, "required_workers", row.waitingJobs+row.runningJobs)
}
return &monitoringpb.CreateTimeSeriesRequest{
Name: "projects/" + projectID,
TimeSeries: series,
}
}
func createTimeSeries(ts *timestamp.Timestamp, org, platform, metricType string, value int) *monitoringpb.TimeSeries {
t := fmt.Sprintf("%s/%s/%s/%s", baseMetricType, org, platform, metricType)
t = strings.Replace(t, "-", "_", -1)
log.Printf("Publishing time series for metric '%s'\n", t)
return &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: t,
},
Resource: &monitoredres.MonitoredResource{
Type: "global",
},
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: ts,
EndTime: ts,
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(value),
},
},
}},
}
}