/
s3_impl.go
154 lines (136 loc) · 4.66 KB
/
s3_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2023 IBM Corp.
// SPDX-License-Identifier: Apache-2.0
package s3
import (
"context"
"strings"
"emperror.dev/errors"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/rs/zerolog"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
"fybrik.io/fybrik/pkg/logging"
"fybrik.io/fybrik/pkg/model/storagemanager"
"fybrik.io/fybrik/pkg/model/taxonomy"
"fybrik.io/fybrik/pkg/random"
"fybrik.io/fybrik/pkg/serde"
"fybrik.io/fybrik/pkg/storage/registrator"
"fybrik.io/fybrik/pkg/storage/registrator/agent"
"fybrik.io/fybrik/pkg/utils"
)
const (
nameHashLength = 10
endpointKey = "endpoint"
bucketKey = "bucket"
objectKey = "object_key"
)
// s3 storage manager implementation
type S3Impl struct {
Name taxonomy.ConnectionType
Log zerolog.Logger
}
func NewS3Impl() *S3Impl {
return &S3Impl{Name: "s3", Log: logging.LogInit(logging.CONNECTOR, "S3StorageManager")}
}
// register the implementation for s3
func init() {
s3Impl := NewS3Impl()
if err := registrator.Register(s3Impl); err != nil {
s3Impl.Log.Error().Err(err)
}
}
// return the supported connection type
func (impl *S3Impl) GetConnectionType() taxonomy.ConnectionType {
return impl.Name
}
// allocate storage for s3 - placeholder
func (impl *S3Impl) AllocateStorage(request *storagemanager.AllocateStorageRequest, client kclient.Client) (taxonomy.Connection, error) {
endpoint, err := agent.GetProperty(request.AccountProperties.Items, impl.Name, endpointKey)
if err != nil {
return taxonomy.Connection{}, err
}
// Initialize minio client object.
minioClient, err := NewClient(endpoint, &request.Secret, client)
if err != nil {
return taxonomy.Connection{}, err
}
genBucketName := generateBucketName(&request.Opts)
genObjectKey := generateObjectKey(&request.Opts)
if err = minioClient.MakeBucket(context.Background(), genBucketName, minio.MakeBucketOptions{}); err != nil {
return taxonomy.Connection{}, errors.Wrapf(err, "could not create a bucket %s", genBucketName)
}
connection := taxonomy.Connection{
Name: impl.Name,
AdditionalProperties: serde.Properties{
Items: map[string]interface{}{
string(impl.Name): map[string]interface{}{
endpointKey: endpoint,
bucketKey: genBucketName,
objectKey: genObjectKey,
},
},
},
}
return connection, nil
}
// delete s3 storage
func (impl *S3Impl) DeleteStorage(request *storagemanager.DeleteStorageRequest, client kclient.Client) error {
endpoint, err := agent.GetProperty(request.Connection.AdditionalProperties.Items, impl.Name, endpointKey)
if err != nil {
return err
}
bucket, err := agent.GetProperty(request.Connection.AdditionalProperties.Items, impl.Name, bucketKey)
if err != nil {
return err
}
// Initialize minio client object.
minioClient, err := NewClient(endpoint, &request.Secret, client)
if err != nil {
return err
}
exists, err := minioClient.BucketExists(context.Background(), bucket)
if !exists {
return kclient.IgnoreNotFound(err)
}
for object := range minioClient.ListObjects(context.Background(), bucket,
minio.ListObjectsOptions{Recursive: true}) {
if err := minioClient.RemoveObject(context.Background(), bucket, object.Key, minio.RemoveObjectOptions{}); err != nil {
return err
}
}
return minioClient.RemoveBucket(context.Background(), bucket)
}
func generateBucketName(opts *storagemanager.Options) string {
suffix, _ := random.Hex(nameHashLength)
name := opts.AppDetails.Name + "-" + opts.AppDetails.Namespace + suffix
return utils.S3ConformName(name)
}
func generateObjectKey(opts *storagemanager.Options) string {
return opts.DatasetProperties.Name + utils.Hash(opts.AppDetails.UUID, nameHashLength)
}
func NewClient(endpointArg string, secretKey *taxonomy.SecretRef, kClient kclient.Client) (*minio.Client, error) {
prefix := "https://"
useSSL := strings.HasPrefix(endpointArg, prefix)
var endpoint string
if !useSSL {
prefix = "http://"
}
endpoint = strings.TrimPrefix(endpointArg, prefix)
// Get credentials
secret := v1.Secret{}
if err := kClient.Get(context.Background(), types.NamespacedName{Name: secretKey.Name,
Namespace: secretKey.Namespace}, &secret); err != nil {
return nil, errors.Wrapf(err, "could not get a secret %s", secretKey.Name)
}
accessKey, secretAccessKey := string(secret.Data["access_key"]), string(secret.Data["secret_key"])
if accessKey == "" || secretAccessKey == "" {
return nil, errors.Errorf("could not retrieve credentials from the secret %s", secretKey.Name)
}
// Initialize minio client object.
return minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKey, secretAccessKey, ""),
Secure: useSSL,
})
}