forked from raystack/optimus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.go
173 lines (141 loc) · 4.39 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package bigquery
import (
"context"
"fmt"
"strings"
"cloud.google.com/go/bigquery"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"github.com/goto/optimus/internal/errors"
)
type BqClientProvider struct{}
func NewClientProvider() *BqClientProvider {
return &BqClientProvider{}
}
func (BqClientProvider) Get(ctx context.Context, account string) (Client, error) {
return NewClient(ctx, account)
}
type BqClient struct {
*bigquery.Client
}
func NewClient(ctx context.Context, svcAccount string) (*BqClient, error) {
cred, err := google.CredentialsFromJSON(ctx, []byte(svcAccount), bigquery.Scope)
if err != nil {
return nil, errors.InternalError(store, "failed to read account", err)
}
c, err := bigquery.NewClient(ctx, cred.ProjectID, option.WithCredentials(cred))
if err != nil {
return nil, errors.InternalError(store, "failed to create BQ client", err)
}
return &BqClient{c}, nil
}
func (c *BqClient) DatasetHandleFrom(ds Dataset) ResourceHandle {
dsHandle := c.DatasetInProject(ds.Project, ds.DatasetName)
return NewDatasetHandle(dsHandle)
}
func (c *BqClient) TableHandleFrom(ds Dataset, name string) TableResourceHandle {
t := c.DatasetInProject(ds.Project, ds.DatasetName).Table(name)
return NewTableHandle(t)
}
func (c *BqClient) ExternalTableHandleFrom(ds Dataset, name string) ResourceHandle {
t := c.DatasetInProject(ds.Project, ds.DatasetName).Table(name)
return NewExternalTableHandle(t)
}
func (c *BqClient) BulkGetDDLView(ctx context.Context, pd ProjectDataset, names []string) (map[ResourceURN]string, error) {
me := errors.NewMultiError("bulk get ddl view errors")
urnToDDL := make(map[ResourceURN]string, len(names))
for _, name := range names {
resourceURN, err := NewResourceURN(pd.Project, pd.Dataset, name)
if err != nil {
me.Append(err)
continue
}
urnToDDL[resourceURN] = ""
}
queryContent := buildGetDDLQuery(pd.Project, pd.Dataset, names...)
queryStatement := c.Client.Query(queryContent)
rowIterator, err := queryStatement.Read(ctx)
if err != nil {
return urnToDDL, err
}
for {
var values []bigquery.Value
if err := rowIterator.Next(&values); err != nil {
if errors.Is(err, iterator.Done) {
break
}
me.Append(err)
continue
}
if len(values) == 0 {
continue
}
name, ddl, err := getViewDDL(values)
if err != nil {
me.Append(err)
continue
}
resourceURN, err := NewResourceURN(pd.Project, pd.Dataset, name)
if err != nil {
me.Append(err)
continue
}
urnToDDL[resourceURN] = ddl
}
return urnToDDL, me.ToErr()
}
func (c *BqClient) ViewHandleFrom(ds Dataset, name string) ResourceHandle {
t := c.DatasetInProject(ds.Project, ds.DatasetName).Table(name)
return NewViewHandle(t)
}
func buildGetDDLQuery(project, dataset string, tables ...string) string {
var nameQueries, prefixQueries []string
const wildCardSuffix = "*"
for _, n := range tables {
if strings.HasSuffix(n, wildCardSuffix) {
prefix, _ := strings.CutSuffix(n, wildCardSuffix)
prefixQuery := fmt.Sprintf("STARTS_WITH(table_name, '%s')", prefix)
prefixQueries = append(prefixQueries, prefixQuery)
} else {
nameQuery := fmt.Sprintf("'%s'", n)
nameQueries = append(nameQueries, nameQuery)
}
}
names := strings.Join(nameQueries, ", ")
prefixes := strings.Join(prefixQueries, " or\n")
var whereClause string
if len(nameQueries) > 0 && len(prefixQueries) > 0 {
whereClause = fmt.Sprintf("WHERE table_name in (%s) or %s", names, prefixes)
} else if len(nameQueries) > 0 {
whereClause = fmt.Sprintf("WHERE table_name in (%s)", names)
} else if len(prefixQueries) > 0 {
whereClause = fmt.Sprintf("WHERE %s", prefixes)
}
return "SELECT table_catalog, table_schema, table_name, table_type, ddl\n" +
fmt.Sprintf("FROM `%s.%s.INFORMATION_SCHEMA.TABLES`\n", project, dataset) +
whereClause
}
func getViewDDL(values []bigquery.Value) (string, string, error) {
const expectedSchemaRowLen = 5
const viewType = "VIEW"
if l := len(values); l != expectedSchemaRowLen {
return "", "", fmt.Errorf("unexpected number of row length: %d", l)
}
name, ok := values[2].(string)
if !ok {
return "", "", fmt.Errorf("error casting name")
}
_type, ok := values[3].(string)
if !ok {
return "", "", fmt.Errorf("error casting _type")
}
ddl, ok := values[4].(string)
if !ok {
return "", "", fmt.Errorf("error casting ddl")
}
if _type == viewType {
return name, ddl, nil
}
return name, "", nil
}