/
bigquery.go
109 lines (85 loc) · 2.49 KB
/
bigquery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package gcp
import (
"context"
"fmt"
"cloud.google.com/go/bigquery"
"github.com/Jeffail/benthos/v3/public/service"
"github.com/Masterminds/squirrel"
)
type bigqueryIterator interface {
Next(dst interface{}) error
}
type bqClient interface {
RunQuery(ctx context.Context, options *bqQueryBuilderOptions) (bigqueryIterator, error)
Close() error
}
func wrapBQClient(client *bigquery.Client, logger *service.Logger) bqClient {
return &wrappedBQClient{wrapped: client, logger: logger}
}
type wrappedBQClient struct {
wrapped *bigquery.Client
logger *service.Logger
}
func (client *wrappedBQClient) RunQuery(ctx context.Context, options *bqQueryBuilderOptions) (bigqueryIterator, error) {
query, err := buildBQQuery(client.wrapped, options)
if err != nil {
return nil, fmt.Errorf("failed to build query: %w", err)
}
job, err := query.Run(ctx)
if err != nil {
return nil, fmt.Errorf("failed to run query: %w", err)
}
client.logger.With("job_id", job.ID()).Debug("running bigquery job")
status, err := job.Wait(ctx)
if err != nil {
return nil, fmt.Errorf("failed to wait on job: %w", err)
}
if err := status.Err(); err != nil {
return nil, fmt.Errorf("failed to complete job successfully: %w", err)
}
it, err := job.Read(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read rows: %w", err)
}
return it, nil
}
func (client *wrappedBQClient) Close() error {
return client.wrapped.Close()
}
type bqQueryParts struct {
table string
columns []string
where string
prefix string
suffix string
}
type bqQueryBuilderOptions struct {
queryParts *bqQueryParts
jobLabels map[string]string
args []interface{}
}
func buildBQQuery(client *bigquery.Client, options *bqQueryBuilderOptions) (*bigquery.Query, error) {
queryParts := options.queryParts
builder := squirrel.
Select(queryParts.columns...).
From(fmt.Sprintf("`%s`", queryParts.table)).
Where(queryParts.where, options.args...)
if queryParts.prefix != "" {
builder = builder.Prefix(queryParts.prefix)
}
if queryParts.suffix != "" {
builder = builder.Suffix(queryParts.suffix)
}
qs, args, err := builder.PlaceholderFormat(squirrel.Question).ToSql()
if err != nil {
return nil, fmt.Errorf("failed to build query string: %w", err)
}
query := client.Query(qs)
query.Labels = options.jobLabels
bqparams := make([]bigquery.QueryParameter, 0, len(args))
for _, arg := range args {
bqparams = append(bqparams, bigquery.QueryParameter{Value: arg})
}
query.Parameters = bqparams
return query, nil
}