/
collectionresource_storage.go
110 lines (95 loc) · 2.92 KB
/
collectionresource_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package esstorage
import (
"context"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8sjson "k8s.io/apimachinery/pkg/util/json"
internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
)
type CollectionResourceStorage struct {
index *Index
indexName string
collectionResource *internal.CollectionResource
}
func NewCollectionResourceStorage(client *elasticsearch.Client, indexName string, cr *internal.CollectionResource) storage.CollectionResourceStorage {
return &CollectionResourceStorage{
index: NewIndex(client),
indexName: indexName,
collectionResource: cr.DeepCopy(),
}
}
func (s *CollectionResourceStorage) Get(ctx context.Context, opts *internal.ListOptions) (*internal.CollectionResource, error) {
builder := NewQueryBuilder()
for _, rt := range s.collectionResource.ResourceTypes {
bool := NewBoolExpression()
bool.SetLogicType(Should)
bool.addExpression(NewTerms(GroupPath, []string{rt.Group}))
if len(rt.Resource) > 0 {
resourceTerm := NewTerms(ResourcePath, []string{rt.Resource})
bool.addExpression(resourceTerm)
}
if len(rt.Version) > 0 {
versionTerm := NewTerms(VersionPath, []string{rt.Version})
bool.addExpression(versionTerm)
}
builder.addExpression(bool)
}
if opts.OnlyMetadata == true {
builder.source = []string{
ApiVersionPath,
KindPath,
ObjectMetaPath,
}
}
err := applyListOptionToQueryBuilder(builder, opts)
if err != nil {
return nil, err
}
r, err := s.index.Search(ctx, builder.build(), []string{s.indexName})
if err != nil {
return nil, err
}
objects := make([]runtime.Object, 0, len(r.GetResources()))
collection := &internal.CollectionResource{
TypeMeta: s.collectionResource.TypeMeta,
ObjectMeta: s.collectionResource.ObjectMeta,
}
for _, item := range r.GetResources() {
object := item.Object
byte, err := json.Marshal(object)
if err != nil {
return nil, err
}
unObj, err := convertToUnstructured(byte)
if err != nil {
return nil, err
}
objects = append(objects, unObj)
gvrs := make(map[schema.GroupVersionResource]struct{})
if resourceType := item.GetResourceType(); !resourceType.Empty() {
gvr := resourceType.GroupVersionResource()
if _, ok := gvrs[gvr]; !ok {
gvrs[gvr] = struct{}{}
collection.ResourceTypes = append(collection.ResourceTypes, internal.CollectionResourceType{
Group: resourceType.Group,
Resource: resourceType.Resource,
Version: resourceType.Version,
Kind: resourceType.Kind,
})
}
}
}
collection.Items = objects
return collection, nil
}
func convertToUnstructured(data []byte) (*unstructured.Unstructured, error) {
obj := &unstructured.Unstructured{}
if err := k8sjson.Unmarshal(data, obj); err != nil {
return nil, err
}
return obj, nil
}