-
Notifications
You must be signed in to change notification settings - Fork 550
/
pod_template_store.go
92 lines (78 loc) · 2.95 KB
/
pod_template_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package flytek8s
import (
"context"
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/flyteorg/flyte/flytestdlib/logger"
)
var DefaultPodTemplateStore PodTemplateStore = NewPodTemplateStore()
// PodTemplateStore maintains a thread-safe mapping of active PodTemplates with their associated
// namespaces.
type PodTemplateStore struct {
*sync.Map
defaultNamespace string
}
// NewPodTemplateStore initializes a new PodTemplateStore
func NewPodTemplateStore() PodTemplateStore {
return PodTemplateStore{
Map: &sync.Map{},
}
}
// Delete removes the specified PodTemplate from the store.
func (p *PodTemplateStore) Delete(podTemplate *v1.PodTemplate) {
if value, ok := p.Load(podTemplate.Name); ok {
podTemplates := value.(*sync.Map)
podTemplates.Delete(podTemplate.Namespace)
logger.Debugf(context.Background(), "deleted PodTemplate '%s:%s' from store", podTemplate.Namespace, podTemplate.Name)
// we specifically are not deleting empty maps from the store because this may introduce race
// conditions where a PodTemplate is being added to the 2nd dimension map while the top level map
// is concurrently being deleted.
}
}
// LoadOrDefault returns the PodTemplate with the specified name in the given namespace. If one
// does not exist it attempts to retrieve the one associated with the defaultNamespace.
func (p *PodTemplateStore) LoadOrDefault(namespace string, podTemplateName string) *v1.PodTemplate {
if value, ok := p.Load(podTemplateName); ok {
podTemplates := value.(*sync.Map)
if podTemplate, ok := podTemplates.Load(namespace); ok {
return podTemplate.(*v1.PodTemplate)
}
if podTemplate, ok := podTemplates.Load(p.defaultNamespace); ok {
return podTemplate.(*v1.PodTemplate)
}
}
return nil
}
// SetDefaultNamespace sets the default namespace for the PodTemplateStore.
func (p *PodTemplateStore) SetDefaultNamespace(namespace string) {
p.defaultNamespace = namespace
}
// Store loads the specified PodTemplate into the store.
func (p *PodTemplateStore) Store(podTemplate *v1.PodTemplate) {
value, _ := p.LoadOrStore(podTemplate.Name, &sync.Map{})
podTemplates := value.(*sync.Map)
podTemplates.Store(podTemplate.Namespace, podTemplate)
logger.Debugf(context.Background(), "registered PodTemplate '%s:%s' in store", podTemplate.Namespace, podTemplate.Name)
}
// GetPodTemplateUpdatesHandler returns a new ResourceEventHandler which adds / removes
// PodTemplates to / from the provided PodTemplateStore.
func GetPodTemplateUpdatesHandler(store *PodTemplateStore) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if podTemplate, ok := obj.(*v1.PodTemplate); ok {
store.Store(podTemplate)
}
},
UpdateFunc: func(old, new interface{}) {
if podTemplate, ok := new.(*v1.PodTemplate); ok {
store.Store(podTemplate)
}
},
DeleteFunc: func(obj interface{}) {
if podTemplate, ok := obj.(*v1.PodTemplate); ok {
store.Delete(podTemplate)
}
},
}
}