-
Notifications
You must be signed in to change notification settings - Fork 2
/
provisioner.go
132 lines (111 loc) · 3.25 KB
/
provisioner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package provisioner
import (
"fmt"
"reflect"
"github.com/Sirupsen/logrus"
clusterV1 "github.com/rancher/cluster-controller/client/v1"
"github.com/rancher/cluster-controller/controller"
"github.com/rancher/cluster-controller/controller/utils"
"k8s.io/client-go/tools/cache"
)
type Provisioner struct {
config *controller.Config
syncQueue *utils.TaskQueue
}
func init() {
p := &Provisioner{}
controller.RegisterController(p.GetName(), p)
}
func (p *Provisioner) Init(cfg *controller.Config) {
p.config = cfg
p.syncQueue = utils.NewTaskQueue("clusterprovisionersync", p.keyFunc, p.sync)
p.config.ClusterInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: p.handleClusterCreate,
DeleteFunc: p.handleClusterDelete,
UpdateFunc: p.handleClusterUpdate,
})
}
func (p *Provisioner) handleClusterCreate(obj interface{}) {
key, err := p.keyFunc(obj)
if err != nil {
return
}
logrus.Infof("Cluster created [%s]", key)
p.syncQueue.Enqueue(obj)
}
func (p *Provisioner) handleClusterUpdate(old, current interface{}) {
key, err := p.keyFunc(current)
if err != nil {
return
}
if configChanged(old, current) {
logrus.Infof("Cluster [%s] updated with the new config", key)
p.syncQueue.Enqueue(current)
}
}
func configChanged(old, current interface{}) bool {
oldC := old.(*clusterV1.Cluster)
newC := current.(*clusterV1.Cluster)
changed := false
if newC.Spec.AKSConfig != nil {
changed = !reflect.DeepEqual(newC.Spec.AKSConfig, oldC.Spec.AKSConfig)
} else if newC.Spec.GKEConfig != nil {
changed = !reflect.DeepEqual(newC.Spec.GKEConfig, oldC.Spec.GKEConfig)
} else if newC.Spec.AKSConfig != nil {
changed = !reflect.DeepEqual(newC.Spec.AKSConfig, oldC.Spec.AKSConfig)
}
return changed
}
func (p *Provisioner) handleClusterDelete(obj interface{}) {
key, err := p.keyFunc(obj)
if err != nil {
return
}
logrus.Infof("Cluster deleted: %s", key)
p.syncQueue.Enqueue(obj)
}
func (p *Provisioner) keyFunc(obj interface{}) (string, error) {
// Cluster object is not namespaced,
// but DeletionHandlingMetaNamespaceKeyFunc already handles it
return cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
}
func (p *Provisioner) sync(key string) error {
logrus.Infof("Syncing provisioning for cluster [%s]", key)
c, exists, err := p.config.ClusterInformer.GetStore().GetByKey(key)
if err != nil {
return fmt.Errorf("Failed to get cluster by name [%s] %v", key, err)
}
if !exists {
//TODO = handle removal using finalizer
err = p.deleteCluster(key)
if err != nil {
return fmt.Errorf("Failed to delete cluster [%s] %v", key, err)
}
} else {
cluster := c.(*clusterV1.Cluster)
err = p.createOrUpdateCluster(cluster)
if err != nil {
return fmt.Errorf("Failed to create/update cluster [%s] %v", key, err)
}
}
logrus.Infof("Successfully synced provisioning cluster [%s]", key)
return nil
}
func (p *Provisioner) deleteCluster(key string) error {
return nil
}
func (p *Provisioner) createOrUpdateCluster(cluster *clusterV1.Cluster) error {
//TODO call drivers to provision the cluster
return nil
}
func (p *Provisioner) Run(stopc <-chan struct{}) error {
go p.syncQueue.Run()
<-stopc
return nil
}
func (p *Provisioner) GetName() string {
return "clusterProvisioner"
}
func (p *Provisioner) Shutdown() {
p.syncQueue.Shutdown()
}