-
Notifications
You must be signed in to change notification settings - Fork 5
/
bucket.go
132 lines (116 loc) · 3.77 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package store
import (
"context"
"github.com/filedag-project/filedag-storage/objectservice/lock"
"github.com/filedag-project/filedag-storage/objectservice/objmetadb"
"github.com/filedag-project/filedag-storage/objectservice/pkg/policy"
"github.com/syndtr/goleveldb/leveldb"
"time"
)
const (
bucketPrefix = "bkt/"
userBucketInfoPrefix = "userbktinfo/"
bucketInfoPrefix = "allbktinfo/"
)
// BucketMetadataSys captures all bucket metadata for a given cluster.
type bucketMetadataSys struct {
bucketMetaStore objmetadb.ObjStoreMetaDBAPI
nsLock *lock.NsLockMap
emptyBucket func(ctx context.Context, bucket string) (bool, error)
}
// NewBucketMetadataSys - creates new policy system.
func NewBucketMetadataSys(db objmetadb.ObjStoreMetaDBAPI) *bucketMetadataSys {
return &bucketMetadataSys{
bucketMetaStore: db,
nsLock: lock.NewNSLock(),
}
}
// NewBucketMetadata creates BucketMetadata with the supplied name and Created to Now.
func NewBucketMetadata(name, region, accessKey string) *BucketMetadata {
p := policy.CreateUserBucketPolicy(name, accessKey)
return &BucketMetadata{
Name: name,
Region: region,
Owner: accessKey,
Created: time.Now().UTC(),
PolicyConfig: p,
}
}
// NewNSLock - initialize a new namespace RWLocker instance.
func (sys *bucketMetadataSys) NewNSLock(bucket string) lock.RWLocker {
return sys.nsLock.NewNSLock("meta", bucket)
}
func (sys *bucketMetadataSys) SetEmptyBucket(emptyBucket func(ctx context.Context, bucket string) (bool, error)) {
sys.emptyBucket = emptyBucket
}
// setBucketMeta - sets a new metadata in-db
func (sys *bucketMetadataSys) setBucketMeta(bucket string, meta *BucketMetadata) error {
return sys.bucketMetaStore.Put(bucketPrefix+bucket, meta)
}
// CreateBucket - create a new Bucket
func (sys *bucketMetadataSys) CreateBucket(ctx context.Context, bucket, region, accessKey string) error {
lk := sys.NewNSLock(bucket)
lkctx, err := lk.GetLock(ctx, globalOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
meta := NewBucketMetadata(bucket, region, accessKey)
err = sys.recordUserBucketInfo(ctx, bucket, accessKey, *meta)
if err != nil {
return err
}
err = sys.setBucketMeta(bucket, meta)
if err != nil {
sys.delUserBucketInfo(ctx, bucket, accessKey)
}
return err
}
func (sys *bucketMetadataSys) getBucketMeta(bucket string) (meta BucketMetadata, err error) {
err = sys.bucketMetaStore.Get(bucketPrefix+bucket, &meta)
if err == leveldb.ErrNotFound {
err = BucketNotFound{Bucket: bucket, Err: err}
}
return meta, err
}
// GetBucketMeta metadata for a bucket.
func (sys *bucketMetadataSys) GetBucketMeta(ctx context.Context, bucket string) (meta BucketMetadata, err error) {
lk := sys.NewNSLock(bucket)
lkctx, err := lk.GetRLock(ctx, globalOperationTimeout)
if err != nil {
return BucketMetadata{}, err
}
ctx = lkctx.Context()
defer lk.RUnlock(lkctx.Cancel)
return sys.getBucketMeta(bucket)
}
// HasBucket metadata for a bucket.
func (sys *bucketMetadataSys) HasBucket(ctx context.Context, bucket string) bool {
_, err := sys.GetBucketMeta(ctx, bucket)
return err == nil
}
// DeleteBucket bucket.
func (sys *bucketMetadataSys) DeleteBucket(ctx context.Context, bucket string, accessKey string) error {
lk := sys.NewNSLock(bucket)
lkctx, err := lk.GetLock(ctx, deleteOperationTimeout)
if err != nil {
return err
}
ctx = lkctx.Context()
defer lk.Unlock(lkctx.Cancel)
if _, err = sys.getBucketMeta(bucket); err != nil {
return err
}
if empty, err := sys.emptyBucket(ctx, bucket); err != nil {
return err
} else if !empty {
return ErrBucketNotEmpty
}
// todo deal del fail
err = sys.delUserBucketInfo(ctx, bucket, accessKey)
if err != nil {
return err
}
return sys.bucketMetaStore.Delete(bucketPrefix + bucket)
}