forked from raystack/meteor
-
Notifications
You must be signed in to change notification settings - Fork 1
/
util.go
154 lines (132 loc) · 3.99 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package plugins
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"reflect"
"sort"
"strings"
"github.com/go-playground/validator/v10"
"github.com/googleapis/gax-go/v2/apierror"
"github.com/goto/meteor/models"
"github.com/mcuadros/go-defaults"
"github.com/mitchellh/mapstructure"
)
var validate *validator.Validate
func init() {
validate = validator.New()
validate.RegisterTagNameFunc(func(fld reflect.StructField) string {
configName := strings.SplitN(fld.Tag.Get("mapstructure"), ",", 2)[0]
if configName == "-" {
return ""
}
return configName
})
}
// BuildConfig builds a config struct from a map
func buildConfig(configMap map[string]interface{}, c interface{}) (err error) {
defaults.SetDefaults(c)
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(), mapstructure.StringToSliceHookFunc(","),
),
WeaklyTypedInput: true,
Result: c,
})
if err != nil {
return fmt.Errorf("create new mapstructure decoder: %w", err)
}
if err = dec.Decode(configMap); err != nil {
return fmt.Errorf("decode with mapstructure: %w", err)
}
if err = validate.Struct(c); err == nil {
return nil
}
var validationErr validator.ValidationErrors
if errors.As(err, &validationErr) {
var configErrors []ConfigError
for _, fieldErr := range validationErr {
key := strings.TrimPrefix(fieldErr.Namespace(), "Config.")
configErrors = append(configErrors, ConfigError{
Key: key,
Message: fmt.Sprintf("validation for field '%s' failed on the '%s' tag", key, fieldErr.Tag()),
})
}
return InvalidConfigError{
Errors: configErrors,
}
}
return err
}
func BigQueryTableFQNToURN(fqn string) (string, error) {
projectID, datasetID, tableID, err := parseBQTableFQN(fqn)
if err != nil {
return "", fmt.Errorf("map URN: %w", err)
}
return BigQueryURN(projectID, datasetID, tableID), nil
}
func BigQueryURN(projectID, datasetID, tableID string) string {
fqn := fmt.Sprintf("%s:%s.%s", projectID, datasetID, tableID)
return models.NewURN("bigquery", projectID, "table", fqn)
}
func KafkaURN(bootstrapServers, topic string) string {
return models.NewURN("kafka", KafkaServersToScope(bootstrapServers), "topic", topic)
}
func KafkaServersToScope(servers string) string {
if strings.IndexRune(servers, ',') > 0 {
// there are multiple bootstrap servers, just strip port, sort and join
var hh []string
for _, s := range strings.Split(servers, ",") {
host, _, err := net.SplitHostPort(s)
if err != nil {
hh = append(hh, s)
continue
}
hh = append(hh, host)
}
sort.Strings(hh)
return strings.Join(hh, ",")
}
host, _, err := net.SplitHostPort(servers)
if err != nil {
return servers
}
return host
}
func CaraMLStoreURN(scope, project, featureTable string) string {
return models.NewURN("caramlstore", scope, "feature_table", project+"."+featureTable)
}
// DrainBody drains and closes the response body to avoid the following
// gotcha:
// http://devs.cloudimmunity.com/gotchas-and-common-mistakes-in-go-golang/index.html#close_http_resp_body
func DrainBody(resp *http.Response) {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}
func BQErrReason(err error) string {
reason := "UNKNOWN"
var apiErr *apierror.APIError
if errors.As(err, &apiErr) {
reason = apiErr.Reason()
}
return reason
}
func parseBQTableFQN(fqn string) (projectID, datasetID, tableID string, err error) {
// fqn is the ID of the table in projectID:datasetID.tableID format.
if !strings.ContainsRune(fqn, ':') || strings.IndexRune(fqn, '.') < strings.IndexRune(fqn, ':') {
return "", "", "", fmt.Errorf(
"unexpected BigQuery table FQN '%s', expected in format projectID:datasetID.tableID", fqn,
)
}
ss := strings.FieldsFunc(fqn, func(r rune) bool {
return r == ':' || r == '.'
})
if len(ss) < 3 {
return "", "", "", fmt.Errorf(
"unexpected BigQuery table FQN '%s', expected in format projectID:datasetID.tableID", fqn,
)
}
return ss[0], ss[1], ss[2], nil
}