-
Notifications
You must be signed in to change notification settings - Fork 168
/
Copy pathexecution_cluster.go
104 lines (83 loc) · 2.6 KB
/
execution_cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package store
import (
"fmt"
"sync"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/common"
)
// ControllerRegistry manages all controllers created for each cluster.
// When a new Cluster found, create controller for it and add its stop
// channel to this registry if not exist. When a Cluster removed, close
// the channel in this map, and delete the key.
var ControllerRegistry map[string]*ClusterController
var lock *sync.Mutex
// ClusterController ...
type ClusterController struct {
// Cluster ...
Cluster *v1alpha1.ExecutionCluster
// Cluster client
Client kubernetes.Interface
// Channel that can be used to stop the controller
StopCh chan struct{}
}
// NewClusterChan is channels for clusters that got created.
var NewClusterChan chan *ClusterController
func init() {
ControllerRegistry = make(map[string]*ClusterController)
lock = &sync.Mutex{}
NewClusterChan = make(chan *ClusterController)
}
// RegisterClusterController register a cluster to this registry.
func RegisterClusterController(cluster *v1alpha1.ExecutionCluster) error {
lock.Lock()
defer lock.Unlock()
if _, ok := ControllerRegistry[cluster.Name]; ok {
log.Infof("cluster %s already registered, skip it", cluster.Name)
return nil
}
log.Infof("register cluster controller for cluster %s", cluster.Name)
client, err := getClusterClient(cluster)
if err != nil {
log.Errorf("create client for cluster %s error: %v", cluster.Name, err)
return err
}
clusterController := &ClusterController{
Cluster: cluster,
Client: client,
StopCh: make(chan struct{}),
}
NewClusterChan <- clusterController
ControllerRegistry[cluster.Name] = clusterController
return nil
}
// RemoveClusterController stop and remove cluster from this registry.
func RemoveClusterController(cluster *v1alpha1.ExecutionCluster) error {
lock.Lock()
defer lock.Unlock()
c, ok := ControllerRegistry[cluster.Name]
if !ok {
return nil
}
close(c.StopCh)
delete(ControllerRegistry, cluster.Name)
return nil
}
// GetClusterClient gets cluster client with the given cluster name
func GetClusterClient(name string) kubernetes.Interface {
lock.Lock()
defer lock.Unlock()
c, ok := ControllerRegistry[name]
if ok {
return c.Client
}
return nil
}
// getClusterClient get kube client from cluster crd
func getClusterClient(cluster *v1alpha1.ExecutionCluster) (kubernetes.Interface, error) {
if cluster == nil {
return nil, fmt.Errorf("nil cluster")
}
return common.NewClusterClient(&cluster.Spec.Credential, cluster.Name == common.ControlClusterName)
}