forked from raystack/optimus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dataset.go
121 lines (100 loc) · 3.19 KB
/
dataset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package bigquery
import (
"context"
"net/http"
"strings"
"time"
"cloud.google.com/go/bigquery"
"google.golang.org/api/googleapi"
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/internal/errors"
)
const (
locationKey = "location"
tableExpirationKey = "table_expiration"
)
type BqDataset interface {
Create(context.Context, *bigquery.DatasetMetadata) error
Update(context.Context, bigquery.DatasetMetadataToUpdate, string) (*bigquery.DatasetMetadata, error)
Metadata(context.Context) (*bigquery.DatasetMetadata, error)
}
type DatasetHandle struct {
bqDataset BqDataset
}
func (d DatasetHandle) Create(ctx context.Context, res *resource.Resource) error {
details, err := ConvertSpecTo[DatasetDetails](res)
if err != nil {
return err
}
err = d.bqDataset.Create(ctx, toBQDatasetMetadata(details, res))
if err != nil {
var metaErr *googleapi.Error
if errors.As(err, &metaErr) &&
metaErr.Code == 409 && strings.Contains(metaErr.Message, "Already Exists") {
return errors.AlreadyExists(EntityDataset, "dataset already exists on bigquery: "+res.FullName())
}
return errors.InternalError(EntityDataset, "failed to create resource "+res.FullName(), err)
}
return nil
}
func (d DatasetHandle) Update(ctx context.Context, res *resource.Resource) error {
details, err := ConvertSpecTo[DatasetDetails](res)
if err != nil {
return err
}
metadataToUpdate := bigquery.DatasetMetadataToUpdate{}
if len(details.Description) > 0 {
metadataToUpdate.Description = details.Description
}
expirationAsInt := ConfigAs[float64](details.ExtraConfig, tableExpirationKey)
if expirationAsInt > 0 {
metadataToUpdate.DefaultTableExpiration = time.Hour * time.Duration(expirationAsInt)
}
for k, v := range res.Metadata().Labels {
metadataToUpdate.SetLabel(k, v)
}
_, err = d.bqDataset.Update(ctx, metadataToUpdate, "")
if err != nil {
var metaErr *googleapi.Error
if errors.As(err, &metaErr) && metaErr.Code == http.StatusNotFound {
return errors.NotFound(EntityDataset, "failed to update dataset in bigquery for "+res.FullName())
}
return errors.InternalError(EntityDataset, "failed to update resource on bigquery for "+res.FullName(), err)
}
return nil
}
func (d DatasetHandle) Exists(ctx context.Context) bool {
_, err := d.bqDataset.Metadata(ctx)
// There can be connection issue, we return false for now
return err == nil
}
func NewDatasetHandle(ds BqDataset) *DatasetHandle {
return &DatasetHandle{bqDataset: ds}
}
func toBQDatasetMetadata(details *DatasetDetails, res *resource.Resource) *bigquery.DatasetMetadata {
meta := &bigquery.DatasetMetadata{
Description: details.Description,
Labels: res.Metadata().Labels,
}
location := ConfigAs[string](details.ExtraConfig, locationKey)
if location != "" {
meta.Location = location
}
// structpb from proto returns a number value as float64
expirationAsInt := ConfigAs[float64](details.ExtraConfig, tableExpirationKey)
if expirationAsInt > 0 {
meta.DefaultTableExpiration = time.Hour * time.Duration(expirationAsInt)
}
return meta
}
func ConfigAs[T any](mapping map[string]any, key string) T {
var zero T
val, ok := mapping[key]
if ok {
s, ok := val.(T)
if ok {
return s
}
}
return zero
}