-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathclient.go
141 lines (119 loc) · 4.29 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2021 OTA Insight Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package batch
import (
"context"
"errors"
"fmt"
"io"
"github.com/OTA-Insight/bqwriter/log"
"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
)
var (
errCouldNotConvertReader = errors.New("BQ batch client: could not convert data into io.Reader")
)
// Client implements the standard/official BQ (cloud) Client,
// using the regular insertAll API with retry logic added on top of that. By default
// the workers will also batch its received rows rather than writing them one by one,
// this can be disabled by setting the batchSize value to the value of 1.
type Client struct {
client *bigquery.Client
dataSetID string
tableID string
schema *bigquery.Schema
sourceFormat bigquery.DataFormat
ignoreUnknownValues bool
writeDisposition bigquery.TableWriteDisposition
logger log.Logger
}
// NewClient creates a new Client.
func NewClient(projectID, dataSetID, tableID string, ignoreUnknownValues bool, sourceFormat bigquery.DataFormat, writeDisposition bigquery.TableWriteDisposition, schema *bigquery.Schema, logger log.Logger, opts ...option.ClientOption) (*Client, error) {
// NOTE: we are using the background Context,
// as to ensure that we can always write to the client,
// even when the actual parent context is already done.
// This is a requirement given the streamer will batch its rows.
client, err := bigquery.NewClient(context.Background(), projectID, opts...)
if err != nil {
return nil, fmt.Errorf("BQ batch client: creation failed: %w", err)
}
return newClient(
client, dataSetID, tableID,
ignoreUnknownValues,
sourceFormat, writeDisposition,
schema, logger,
)
}
func newClient(client *bigquery.Client, dataSetID, tableID string, ignoreUnknownValues bool, sourceFormat bigquery.DataFormat, writeDisposition bigquery.TableWriteDisposition, schema *bigquery.Schema, logger log.Logger) (*Client, error) {
return &Client{
client: client,
dataSetID: dataSetID,
tableID: tableID,
schema: schema,
sourceFormat: sourceFormat,
ignoreUnknownValues: ignoreUnknownValues,
writeDisposition: writeDisposition,
logger: logger,
}, nil
}
// Put implements bqClient::Put
func (bqc *Client) Put(data interface{}) (bool, error) {
reader, ok := data.(io.Reader)
if !ok {
return false, errCouldNotConvertReader
}
ctx := context.Background()
source := bigquery.NewReaderSource(reader)
source.SourceFormat = bqc.sourceFormat
source.IgnoreUnknownValues = bqc.ignoreUnknownValues
if bqc.schema == nil {
source.AutoDetect = true
} else {
source.Schema = *bqc.schema
}
table := bqc.client.Dataset(bqc.dataSetID).Table(bqc.tableID)
loader := table.LoaderFrom(source)
loader.WriteDisposition = bqc.writeDisposition
job, err := loader.Run(ctx)
if err != nil {
return false, fmt.Errorf("BQ batch client: failed to run loader: %w", err)
}
status, err := job.Wait(ctx)
if err != nil {
return false, fmt.Errorf("BQ batch client: job failed while waiting: %w", err)
}
if err := status.Err(); err != nil {
for _, statErr := range status.Errors {
bqc.logger.Errorf("BQ batch client: status error: %v", statErr)
}
return false, fmt.Errorf("BQ batch client: job returned an error status: %w", err)
}
// We flush every time when we write data.
return true, nil
}
// Flush implements bigquery.Client::Flush
func (bqc *Client) Flush() error {
// NOTE: The data is always flushed instantly upon putting the data.
return nil
}
// Close implements bqClient::Close
func (bqc *Client) Close() error {
// no need to flush first,
// as this is an internal client used by Streamer only,
// which does flush prior to closing it :)
if err := bqc.client.Close(); err != nil {
return fmt.Errorf("BQ batch client: failed while closing: %w", err)
}
return nil
}