-
Notifications
You must be signed in to change notification settings - Fork 1
/
bq.go
195 lines (167 loc) · 6.32 KB
/
bq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// Package bq provides a wrapper for Google's BigQuery library as well as
// general setup of the BigQuery client and streaming inserts usign bqstreamer.
package bq
import (
"encoding/base64"
"fmt"
"io/ioutil"
"os"
"strings"
"time"
"cloud.google.com/go/bigquery"
log "github.com/sirupsen/logrus"
"github.com/getconversio/go-utils/util"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
"google.golang.org/api/option"
bqstreamer "gopkg.in/rounds/go-bqstreamer.v2"
)
type BigQueryWrapper struct {
Client *bigquery.Client
ProjectId string
DatasetId string
TablePrefix string
}
var insertWorker *bqstreamer.AsyncWorkerGroup
func (w *BigQueryWrapper) Dataset() *bigquery.Dataset {
return w.Client.Dataset(w.DatasetId)
}
func (w *BigQueryWrapper) TableId(tableId string) string {
if w.TablePrefix != "" {
tableId = fmt.Sprintf("%s_%s", w.TablePrefix, tableId)
}
return tableId
}
func (w *BigQueryWrapper) Table(tableId string) *bigquery.Table {
tableId = w.TableId(tableId)
return w.Dataset().Table(tableId)
}
func (w *BigQueryWrapper) AddRow(tableId string, row interface{}) error {
data, err := EncodeLegacy(row, true)
if err != nil {
return err
}
log.Debugf("BigQuery mapped row for %s: %#v", tableId, data)
mappedRow := bqstreamer.NewRow(w.ProjectId, w.DatasetId, w.TableId(tableId), data)
insertWorker.Enqueue(mappedRow)
return nil
}
func (w *BigQueryWrapper) UseTablePrefix(useIt bool) {
if useIt {
hostname, err := os.Hostname()
util.PanicOnError("Cannot get hostname", err)
w.TablePrefix = util.Hash32(hostname)
} else {
w.TablePrefix = ""
}
}
// Creates a new wrapper for a bigquery client
func NewBigQueryWrapper(client *bigquery.Client, projectId, datasetId string) *BigQueryWrapper {
wrapper := BigQueryWrapper{
Client: client,
ProjectId: projectId,
DatasetId: datasetId,
}
return &wrapper
}
func bigqueryJWTConfig() (*jwt.Config, error) {
saFile := os.Getenv("BIGQUERY_SERVICE_ACCOUNT")
if strings.HasSuffix(saFile, ".json") {
log.Debug("Using JSON file for BigQuery credentials")
saData, err := ioutil.ReadFile(saFile)
util.PanicOnError("Cannot read keyfile", err)
return google.JWTConfigFromJSON(saData, bigquery.Scope)
} else {
// saFile is a string, read the string. Assume it's base64 encoded.
log.Debug("Using configuration value for BigQuery credentials")
saData, err := base64.StdEncoding.DecodeString(saFile)
util.PanicOnError("Cannot base64 decode BigQuery credentials", err)
return google.JWTConfigFromJSON(saData, bigquery.Scope)
}
}
// Close any open workers.
func Close() {
if insertWorker != nil {
log.Info("Waiting for BigQuery insert worker to finish")
insertWorker.Close()
insertWorker = nil
}
}
// Sets up the BigQuery client wrapper. Assumes that the following environment variables are set:
// BIGQUERY_PROJECT_ID: The ID of the bigquery project, from the Google Cloud console
// BIGQUERY_DATASET_ID: The ID of dataset to use.
// BIGQUERY_SERVICE_ACCOUNT: A filepath for a bigquery service account
// configuration OR a base64 encoded string with the service account credentials.
// The created wrapper is returned.
func Setup() *BigQueryWrapper {
config, err := bigqueryJWTConfig()
util.PanicOnError("Cannot load BigQuery credentials", err)
ctx := context.Background()
opt := option.WithTokenSource(config.TokenSource(ctx))
client, err := bigquery.NewClient(ctx, os.Getenv("BIGQUERY_PROJECT_ID"), opt)
util.PanicOnError("Cannot create client for BigQuery", err)
return NewBigQueryWrapper(client, os.Getenv("BIGQUERY_PROJECT_ID"), os.Getenv("BIGQUERY_DATASET_ID"))
}
func handleInsertError(insertErrs *bqstreamer.InsertErrors) {
// Each message is a struct that contains zero or more table errors, fetched with All
// Each table error contain zero or more insert attempts
// Each insert attempt contain zero or more
for _, table := range insertErrs.All() {
for _, attempt := range table.Attempts() {
// Log insert attempt error.
if err := attempt.Error(); err != nil {
log.WithFields(log.Fields{
"project": attempt.Project,
"dataset": attempt.Dataset,
"table": attempt.Table,
}).Error("bigquery table insert error", err)
}
// Iterate over all rows in attempt.
for _, row := range attempt.All() {
// Iterate over all errors in row and log.
for _, err := range row.All() {
log.WithFields(log.Fields{
"project": attempt.Project,
"dataset": attempt.Dataset,
"table": attempt.Table,
"insertid": row.InsertID,
}).Error("bigquery row insert error", err)
}
}
}
}
}
// Sets up workers for handling streaming inserts using multiple concurrent go routines.
// Expects the same environment variables as Setup().
func SetupStreamingInserts() {
config, err := bigqueryJWTConfig()
util.PanicOnError("Cannot load BigQuery credentials", err)
// Error handling goroutine
// bqstreamer sends errors to the error channel.
errChan := make(chan *bqstreamer.InsertErrors)
go func() {
for insertErrs := range errChan {
handleInsertError(insertErrs)
}
}()
// Initialize a worker group.
max_retries := util.GetenvInt("BIGQUERY_STREAMING_MAX_RETRIES", 10)
max_rows := util.GetenvInt("BIGQUERY_STREAMING_MAX_ROWS", 500)
max_delay := time.Duration(util.GetenvInt("BIGQUERY_STREAMING_MAX_DELAY", 1000))
insertWorker, err = bqstreamer.NewAsyncWorkerGroup(
config,
bqstreamer.SetAsyncNumWorkers(2), // Number of background workers in the group.
bqstreamer.SetAsyncMaxRows(max_rows), // Amount of rows that must be enqueued before executing an insert operation to BigQuery.
bqstreamer.SetAsyncMaxDelay(max_delay*time.Millisecond), // Time to wait between inserts.
bqstreamer.SetAsyncRetryInterval(max_delay*time.Millisecond), // Time to wait between failed insert retries.
bqstreamer.SetAsyncMaxRetries(max_retries), // Maximum amount of retries a failed insert is allowed to be retried.
bqstreamer.SetAsyncIgnoreUnknownValues(false), // Ignore unknown fields when inserting rows.
bqstreamer.SetAsyncSkipInvalidRows(false), // Skip bad rows when inserting.
bqstreamer.SetAsyncErrorChannel(errChan), // Set unified error channel.
)
if err != nil {
util.PanicOnError("Error setting up BigQuery streaming workers", err)
}
insertWorker.Start()
}