-
Notifications
You must be signed in to change notification settings - Fork 124
/
memory_storage.go
109 lines (87 loc) · 2.92 KB
/
memory_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package memorystorage
import (
"context"
"k8s.io/apimachinery/pkg/runtime/schema"
internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache"
)
type StorageFactory struct {
clusters map[string]bool
}
func (s *StorageFactory) GetSupportedRequestVerbs() []string {
return []string{"get", "list", "watch"}
}
func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
storages.Lock()
defer storages.Unlock()
gvr := schema.GroupVersionResource{
Group: config.GroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.GroupResource.Resource,
}
resourceStorage, ok := storages.resourceStorages[gvr]
if ok {
watchCache := resourceStorage.watchCache
if config.Namespaced && !watchCache.IsNamespaced {
watchCache.KeyFunc = cache.GetKeyFunc(gvr, config.Namespaced)
watchCache.IsNamespaced = true
}
} else {
watchCache := cache.NewWatchCache(100, gvr, config.Namespaced)
resourceStorage = &ResourceStorage{
incoming: make(chan ClusterWatchEvent, 100),
Codec: config.Codec,
watchCache: watchCache,
storageConfig: config,
}
storages.resourceStorages[gvr] = resourceStorage
}
for cluster := range s.clusters {
resourceStorage.watchCache.AddIndexer(cluster, nil)
if resourceStorage.CrvSynchro == nil {
resourceStorage.CrvSynchro = cache.NewClusterResourceVersionSynchro(cluster)
} else {
resourceStorage.CrvSynchro.SetClusterResourceVersion(cluster, "0")
}
}
return resourceStorage, nil
}
func (s *StorageFactory) PrepareCluster(cluster string) error {
storages.Lock()
defer storages.Unlock()
if _, ok := s.clusters[cluster]; ok {
return nil
}
s.clusters[cluster] = true
return nil
}
func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionResource) (storage.CollectionResourceStorage, error) {
return nil, nil
}
func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
return nil, nil
}
func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
storages.Lock()
defer storages.Unlock()
for _, rs := range storages.resourceStorages {
rs.CrvSynchro.RemoveCluster(cluster)
rs.watchCache.CleanCluster(cluster)
delete(s.clusters, cluster)
}
return nil
}
func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
storages.Lock()
defer storages.Unlock()
if rs, ok := storages.resourceStorages[gvr]; ok {
rs.CrvSynchro.RemoveCluster(cluster)
rs.watchCache.CleanCluster(cluster)
delete(s.clusters, cluster)
}
return nil
}
func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) {
return nil, nil
}