-
Notifications
You must be signed in to change notification settings - Fork 127
/
groupjobs.go
151 lines (137 loc) · 4.06 KB
/
groupjobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package repository
import (
"fmt"
"strings"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pkg/errors"
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/lookoutv2/model"
)
type GroupByResult struct {
Groups []*model.JobGroup
}
type GroupJobsRepository interface {
GroupBy(
ctx *armadacontext.Context,
filters []*model.Filter,
order *model.Order,
groupedField string,
aggregates []string,
skip int,
take int,
) (*GroupByResult, error)
}
type SqlGroupJobsRepository struct {
db *pgxpool.Pool
lookoutTables *LookoutTables
}
const stateAggregatePrefix = "state_"
func NewSqlGroupJobsRepository(db *pgxpool.Pool) *SqlGroupJobsRepository {
return &SqlGroupJobsRepository{
db: db,
lookoutTables: NewTables(),
}
}
func (r *SqlGroupJobsRepository) GroupBy(
ctx *armadacontext.Context,
filters []*model.Filter,
activeJobSets bool,
order *model.Order,
groupedField *model.GroupedField,
aggregates []string,
skip int,
take int,
) (*GroupByResult, error) {
query, err := NewQueryBuilder(r.lookoutTables).GroupBy(filters, activeJobSets, order, groupedField, aggregates, skip, take)
if err != nil {
return nil, err
}
logQuery(query, "GroupBy")
var groups []*model.JobGroup
if err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{
IsoLevel: pgx.RepeatableRead,
AccessMode: pgx.ReadOnly,
DeferrableMode: pgx.Deferrable,
}, func(tx pgx.Tx) error {
groupRows, err := tx.Query(ctx, query.Sql, query.Args...)
if err != nil {
return err
}
groups, err = rowsToGroups(groupRows, groupedField, aggregates, filters)
return err
}); err != nil {
return nil, err
}
return &GroupByResult{
Groups: groups,
}, nil
}
func rowsToGroups(rows pgx.Rows, groupedField *model.GroupedField, aggregates []string, filters []*model.Filter) ([]*model.JobGroup, error) {
var groups []*model.JobGroup
for rows.Next() {
jobGroup, err := scanGroup(rows, groupedField.Field, aggregates, filters)
if err != nil {
return nil, err
}
groups = append(groups, jobGroup)
}
return groups, nil
}
func scanGroup(rows pgx.Rows, field string, aggregates []string, filters []*model.Filter) (*model.JobGroup, error) {
groupParser := ParserForGroup(field)
var count int64
var aggregateParsers []FieldParser
for _, aggregate := range aggregates {
parsers, err := ParsersForAggregate(aggregate, filters)
if err != nil {
return nil, err
}
aggregateParsers = append(aggregateParsers, parsers...)
}
aggregateRefs := make([]interface{}, len(aggregateParsers))
for i, parser := range aggregateParsers {
aggregateRefs[i] = parser.GetVariableRef()
}
varAddresses := slices.Concatenate([]interface{}{groupParser.GetVariableRef(), &count}, aggregateRefs)
err := rows.Scan(varAddresses...)
if err != nil {
return nil, err
}
parsedGroup, err := groupParser.ParseValue()
if err != nil {
return nil, err
}
aggregatesMap := make(map[string]interface{})
for _, parser := range aggregateParsers {
val, err := parser.ParseValue()
if err != nil {
return nil, errors.Wrapf(err, "failed to parse value for field %s", parser.GetField())
}
if strings.HasPrefix(parser.GetField(), stateAggregatePrefix) {
singleStateCount, ok := val.(int)
if !ok {
return nil, errors.Errorf("failed to parse value for state aggregate: cannot convert value to int: %v: %T", singleStateCount, singleStateCount)
}
stateCountsVal, ok := aggregatesMap[stateField]
if !ok {
stateCountsVal = map[string]int{}
aggregatesMap[stateField] = stateCountsVal
}
stateCounts, ok := stateCountsVal.(map[string]int)
if !ok {
return nil, errors.Errorf("failed to parse value for state aggregate: cannot cast state counts to map")
}
state := parser.GetField()[len(stateAggregatePrefix):]
stateCounts[state] = singleStateCount
} else {
aggregatesMap[parser.GetField()] = val
}
}
return &model.JobGroup{
Name: fmt.Sprintf("%s", parsedGroup),
Count: count,
Aggregates: aggregatesMap,
}, nil
}