-
Notifications
You must be signed in to change notification settings - Fork 78
/
s3_state.go
133 lines (114 loc) · 2.93 KB
/
s3_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package remote
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"kusionstack.io/kusion/pkg/engine/states"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/zclconf/go-cty/cty"
)
var ErrS3NoExist = errors.New("s3: key not exist")
var _ states.StateStorage = &S3State{}
type S3State struct {
sess *session.Session
bucketName string
}
func NewS3State(endPoint, accessKeyID, accessKeySecret, bucketName string, region string) (*S3State, error) {
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(accessKeyID, accessKeySecret, ""),
Endpoint: aws.String(endPoint),
Region: aws.String(region),
DisableSSL: aws.Bool(true),
S3ForcePathStyle: aws.Bool(false),
})
if err != nil {
return nil, err
}
s3State := &S3State{
sess: sess,
bucketName: bucketName,
}
return s3State, nil
}
// ConfigSchema returns a description of the expected configuration
// structure for the receiving backend.
func (s *S3State) ConfigSchema() cty.Type {
return cty.Type{}
}
// Configure uses the provided configuration to set configuration fields
// within the S3State backend.
func (s *S3State) Configure(obj cty.Value) error {
return nil
}
func (s *S3State) Apply(state *states.State) error {
u, err := uuid.NewUUID()
if err != nil {
return err
}
jsonByte, err := json.MarshalIndent(state, "", " ")
if err != nil {
return err
}
prefix := state.Tenant + "/" + state.Project + "/" + state.Stack
svc := s3.New(s.sess)
_, err = svc.PutObject(&s3.PutObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(prefix + u.String()),
Body: bytes.NewReader(jsonByte),
})
if err != nil {
return err
}
return nil
}
func (s *S3State) Delete(id string) error {
panic("implement me")
}
func (s *S3State) GetLatestState(query *states.StateQuery) (*states.State, error) {
prefix := query.Tenant + "/" + query.Project + "/" + query.Stack
svc := s3.New(s.sess)
params := &s3.ListObjectsInput{
Bucket: aws.String(s.bucketName),
Delimiter: aws.String("/"),
Prefix: aws.String(prefix),
}
objects, err := svc.ListObjects(params)
if err != nil {
return nil, err
}
var result *s3.Object
if len(objects.Contents) == 0 {
return nil, ErrS3NoExist
}
for _, obj := range objects.Contents {
if result == nil || result.LastModified.UnixNano() < obj.LastModified.UnixNano() {
result = obj
}
}
if result == nil {
return nil, ErrS3NoExist
}
out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: result.Key,
})
if err != nil {
return nil, err
}
defer out.Body.Close()
data, err := ioutil.ReadAll(out.Body)
if err != nil {
return nil, err
}
state := &states.State{}
err = json.Unmarshal(data, state)
if err != nil {
return nil, err
}
return state, nil
}