-
Notifications
You must be signed in to change notification settings - Fork 1
/
checks.go
232 lines (196 loc) · 6.89 KB
/
checks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package bigquery
import (
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/bruin-data/bruin/pkg/helpers"
"github.com/bruin-data/bruin/pkg/query"
"github.com/bruin-data/bruin/pkg/scheduler"
"github.com/pkg/errors"
)
type NotNullCheck struct {
conn connectionFetcher
}
func ensureCountZero(res [][]interface{}) (int64, error) {
return helpers.CastResultToInteger(res)
}
func (c *NotNullCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM %s WHERE %s IS NULL", ti.GetAsset().Name, ti.Column.Name)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "not_null",
customError: func(count int64) error {
return errors.Errorf("column %s has %d null values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}
type PositiveCheck struct {
conn connectionFetcher
}
func (c *PositiveCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM %s WHERE %s <= 0", ti.GetAsset().Name, ti.Column.Name)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "positive",
customError: func(count int64) error {
return errors.Errorf("column %s has %d non-positive values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}
type NonNegativeCheck struct {
conn connectionFetcher
}
func (c *NonNegativeCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM %s WHERE %s < 0", ti.GetAsset().Name, ti.Column.Name)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "non_negative",
customError: func(count int64) error {
return errors.Errorf("column %s has %d negative values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}
type NegativeCheck struct {
conn connectionFetcher
}
func (c *NegativeCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT count(*) FROM %s WHERE %s >= 0", ti.GetAsset().Name, ti.Column.Name)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "negative",
customError: func(count int64) error {
return errors.Errorf("column %s has %d non negative values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}
type UniqueCheck struct {
conn connectionFetcher
}
func (c *UniqueCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
qq := fmt.Sprintf("SELECT COUNT(%s) - COUNT(DISTINCT %s) FROM %s", ti.Column.Name, ti.Column.Name, ti.GetAsset().Name)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "unique",
customError: func(count int64) error {
return errors.Errorf("column %s has %d non-unique values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}
type PatternCheck struct {
conn connectionFetcher
}
func (c *PatternCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
if ti.Check.Value.String == nil {
return errors.Errorf("unexpected value %s for pattern check, the value must be a string", ti.Check.Value.ToString())
}
qq := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE REGEXP_CONTAINS(%s, r'%s')",
ti.GetAsset().Name,
ti.Column.Name,
*ti.Check.Value.String,
)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "pattern",
customError: func(count int64) error {
return errors.Errorf("column %s has %d values that don't satisfy the pattern %s", ti.Column.Name, count, *ti.Check.Value.String)
},
}).Check(ctx, ti)
}
type AcceptedValuesCheck struct {
conn connectionFetcher
}
func (c *AcceptedValuesCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
if ti.Check.Value.StringArray == nil && ti.Check.Value.IntArray == nil {
return errors.Errorf("unexpected value for accepted_values check, the values must to be an array, instead %T", ti.Check.Value)
}
if ti.Check.Value.StringArray != nil && len(*ti.Check.Value.StringArray) == 0 {
return errors.Errorf("no values provided for accepted_values check")
}
if ti.Check.Value.IntArray != nil && len(*ti.Check.Value.IntArray) == 0 {
return errors.Errorf("no values provided for accepted_values check")
}
var val []string
if ti.Check.Value.StringArray != nil {
val = *ti.Check.Value.StringArray
} else {
for _, v := range *ti.Check.Value.IntArray {
val = append(val, strconv.Itoa(v))
}
}
res, err := json.Marshal(val)
if err != nil {
return errors.Wrap(err, "failed to marshal accepted values for the query result")
}
sz := len(res)
res = res[1 : sz-1]
qq := fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE CAST(%s as STRING) NOT IN (%s)", ti.GetAsset().Name, ti.Column.Name, res)
return (&countableQueryCheck{
conn: c.conn,
queryInstance: &query.Query{Query: qq},
checkName: "accepted_values",
customError: func(count int64) error {
return errors.Errorf("column %s has %d rows that are not in the accepted values", ti.Column.Name, count)
},
}).Check(ctx, ti)
}
type countableQueryCheck struct {
conn connectionFetcher
expectedQueryResult int64
queryInstance *query.Query
checkName string
customError func(count int64) error
}
func (c *countableQueryCheck) Check(ctx context.Context, ti *scheduler.ColumnCheckInstance) error {
conn, err := ti.Pipeline.GetConnectionNameForAsset(ti.GetAsset())
if err != nil {
return err
}
return c.check(ctx, conn)
}
func (c *countableQueryCheck) CustomCheck(ctx context.Context, ti *scheduler.CustomCheckInstance) error {
conn, err := ti.Pipeline.GetConnectionNameForAsset(ti.GetAsset())
if err != nil {
return err
}
return c.check(ctx, conn)
}
func (c *countableQueryCheck) check(ctx context.Context, connectionName string) error {
q, err := c.conn.GetBqConnection(connectionName)
if err != nil {
return errors.Wrapf(err, "failed to get connection for '%s' check", c.checkName)
}
res, err := q.Select(ctx, c.queryInstance)
if err != nil {
return errors.Wrapf(err, "failed '%s' check", c.checkName)
}
count, err := ensureCountZero(res)
if err != nil {
return errors.Wrapf(err, "failed to parse '%s' check result", c.checkName)
}
if count != c.expectedQueryResult {
return c.customError(count)
}
return nil
}
type CustomCheck struct {
conn connectionFetcher
}
func (c *CustomCheck) Check(ctx context.Context, ti *scheduler.CustomCheckInstance) error {
return (&countableQueryCheck{
conn: c.conn,
expectedQueryResult: ti.Check.Value,
queryInstance: &query.Query{Query: ti.Check.Query},
checkName: ti.Check.Name,
customError: func(count int64) error {
return errors.Errorf("custom check '%s' has returned %d instead of the expected %d", ti.Check.Name, count, ti.Check.Value)
},
}).CustomCheck(ctx, ti)
}