-
Notifications
You must be signed in to change notification settings - Fork 39
/
prefix.go
157 lines (140 loc) · 3.99 KB
/
prefix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package bsdb
import (
"fmt"
"path/filepath"
"strings"
"time"
"github.com/forbole/juno/v4/common"
"github.com/spaolacci/murmur3"
"gorm.io/gorm"
)
const PrefixesNumberOfShards = 64
// ListObjects List objects by bucket name
func (b *BsDBImpl) ListObjects(bucketName, continuationToken, prefix string, maxKeys int) ([]*ListObjectsResult, error) {
var (
nodes []*SlashPrefixTreeNode
filters []func(*gorm.DB) *gorm.DB
res []*ListObjectsResult
pathName string
prefixQuery string
limit int
err error
)
startTime := time.Now()
methodName := currentFunction()
defer func() {
if err != nil {
MetadataDatabaseFailureMetrics(err, startTime, methodName)
} else {
MetadataDatabaseSuccessMetrics(startTime, methodName)
}
}()
// return NextContinuationToken by adding 1 additionally
limit = maxKeys + 1
strings.Split(prefix, "/")
pathName, prefixQuery = processPath(prefix)
if pathName != "" {
filters = append(filters, PathNameFilter(pathName))
}
if prefixQuery != "" {
filters = append(filters, NameFilter(prefixQuery))
}
if continuationToken != "" {
filters = append(filters, FullNameFilter(continuationToken))
}
err = b.db.Table(GetPrefixesTableName(bucketName)).
Where("bucket_name = ?", bucketName).
Scopes(filters...).
Order("full_name").
Limit(limit).
Find(&nodes).Error
if err != nil {
return nil, err
}
res, err = b.filterObjects(nodes, bucketName)
return res, err
}
// processPath takes in a string that is a path name, and returns two strings:
// the directory part of the path, and the file part of the path. If the path does not contain
// a "/", then the directory is "/" and the file is the path.
func processPath(pathName string) (string, string) {
var (
dir string
file string
)
if !strings.Contains(pathName, "/") {
dir = "/"
file = pathName
} else {
dir, file = filepath.Split(pathName)
}
return dir, file
}
// filterObjects filters a slice of SlashPrefixTreeNode for nodes which IsObject attribute is true,
// maps these objects by their ID and transforms them into a ListObjectsResult format.
// Returns a slice of ListObjectsResult containing filtered object data or an error if something goes wrong.
func (b *BsDBImpl) filterObjects(nodes []*SlashPrefixTreeNode, bucketName string) ([]*ListObjectsResult, error) {
var (
objectIDs []common.Hash
totalObjects []*Object
objects []*Object
res []*ListObjectsResult
objectsMap map[common.Hash]*Object
err error
)
startTime := time.Now()
methodName := currentFunction()
defer func() {
if err != nil {
MetadataDatabaseFailureMetrics(err, startTime, methodName)
} else {
MetadataDatabaseSuccessMetrics(startTime, methodName)
}
}()
//filter objects and query the info
for _, node := range nodes {
if node.IsObject {
objectIDs = append(objectIDs, node.ObjectID)
}
}
err = b.db.Table(GetObjectsTableName(bucketName)).
Where("object_id in (?)", objectIDs).
Find(&objects).Error
if err != nil {
return nil, err
}
totalObjects = append(totalObjects, objects...)
objectsMap = make(map[common.Hash]*Object)
for _, object := range totalObjects {
objectsMap[object.ObjectID] = object
}
for _, node := range nodes {
if node.IsObject {
object := objectsMap[node.ObjectID]
if object == nil {
continue
}
res = append(res, &ListObjectsResult{
PathName: node.FullName,
ResultType: ObjectName,
Object: object,
})
} else {
res = append(res, &ListObjectsResult{
PathName: node.FullName,
ResultType: CommonPrefix,
Object: &Object{},
})
}
}
return res, nil
}
func GetPrefixesTableName(bucketName string) string {
return GetPrefixesTableNameByShardNumber(int(GetPrefixesShardNumberByBucketName(bucketName)))
}
func GetPrefixesShardNumberByBucketName(bucketName string) uint32 {
return murmur3.Sum32([]byte(bucketName)) % PrefixesNumberOfShards
}
func GetPrefixesTableNameByShardNumber(shard int) string {
return fmt.Sprintf("%s_%02d", PrefixTreeTableName, shard)
}