-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfig.go
80 lines (66 loc) · 1.8 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package bigquery
import (
"fmt"
"time"
"cloud.google.com/go/bigquery"
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
)
type TablePeriod int
const (
defaultQueueSize = 1000
defaultWorkerSize = 10
defaultWorkerStack = 500 // bigquery document recommend size
)
const (
NotExist TablePeriod = iota
Daily
Monthly
Yearly
)
type Config struct {
projectId string // gcp project
jwt *jwt.Config // gcp jwt config
schemas []*TableSchema // table schemas
queueSize int // max queue
workerSize int // worker count
workerStack int // worker stack size
workerDelay time.Duration // worker insert wait duration
}
type TableSchema struct {
DatasetId string // bigquery datasetId
Prefix string // bigquery table prefix
Meta *bigquery.TableMetadata // bigquery table meta
Period TablePeriod // TablePeriod
}
func NewConfig(projectId string, jwtbys []byte, schemas []*TableSchema, queueSize, workerSize, workerStack int, workerDelay time.Duration) (*Config, error) {
if len(jwtbys) == 0 || projectId == "" || len(schemas) == 0 {
return nil, fmt.Errorf("[err] NewConfig empty params")
}
for _, schema := range schemas {
if len(schema.Meta.Schema) == 0 {
return nil, fmt.Errorf("[err] NewConfig empty schema")
}
}
if queueSize <= 0 {
queueSize = defaultQueueSize
}
if workerSize <= 0 {
workerSize = defaultWorkerSize
}
if workerStack <= 0 {
workerStack = defaultWorkerStack
}
jwt, err := google.JWTConfigFromJSON(jwtbys, bigquery.Scope)
if err != nil {
return nil, err
}
return &Config{
projectId: projectId,
jwt: jwt, schemas: schemas,
queueSize: queueSize,
workerSize: workerSize,
workerStack: workerStack,
workerDelay: workerDelay,
}, nil
}