/
cluster_config.go
207 lines (180 loc) · 6.24 KB
/
cluster_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*
Copyright 2019 The Rook Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"encoding/json"
"os"
"sync"
"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
cephconfig "github.com/rook/rook/pkg/daemon/ceph/config"
"github.com/rook/rook/pkg/operator/k8sutil"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
var (
logger = capnslog.NewPackageLogger("github.com/rook/rook", "ceph-csi")
)
type csiClusterConfigEntry struct {
ClusterID string `json:"clusterID"`
Monitors []string `json:"monitors"`
}
type csiClusterConfig []csiClusterConfigEntry
// FormatCsiClusterConfig returns a json-formatted string containing
// the cluster-to-mon mapping required to configure ceph csi.
func FormatCsiClusterConfig(
clusterKey string, mons map[string]*cephconfig.MonInfo) (string, error) {
cc := make(csiClusterConfig, 1)
cc[0].ClusterID = clusterKey
cc[0].Monitors = []string{}
for _, m := range mons {
cc[0].Monitors = append(cc[0].Monitors, m.Endpoint)
}
ccJson, err := json.Marshal(cc)
if err != nil {
return "", errors.Wrapf(err, "failed to marshal csi cluster config")
}
return string(ccJson), nil
}
func parseCsiClusterConfig(c string) (csiClusterConfig, error) {
var cc csiClusterConfig
err := json.Unmarshal([]byte(c), &cc)
if err != nil {
return cc, errors.Wrapf(err, "failed to parse csi cluster config")
}
return cc, nil
}
func formatCsiClusterConfig(cc csiClusterConfig) (string, error) {
ccJson, err := json.Marshal(cc)
if err != nil {
return "", errors.Wrapf(err, "failed to marshal csi cluster config")
}
return string(ccJson), nil
}
func monEndpoints(mons map[string]*cephconfig.MonInfo) []string {
endpoints := make([]string, 0)
for _, m := range mons {
endpoints = append(endpoints, m.Endpoint)
}
return endpoints
}
// UpdateCsiClusterConfig returns a json-formatted string containing
// the cluster-to-mon mapping required to configure ceph csi.
func UpdateCsiClusterConfig(
curr, clusterKey string, mons map[string]*cephconfig.MonInfo) (string, error) {
var (
cc csiClusterConfig
centry csiClusterConfigEntry
found bool
)
cc, err := parseCsiClusterConfig(curr)
if err != nil {
return "", errors.Wrapf(err, "failed to parse current csi cluster config")
}
for i, centry := range cc {
if centry.ClusterID == clusterKey {
centry.Monitors = monEndpoints(mons)
found = true
cc[i] = centry
break
}
}
if !found {
centry.ClusterID = clusterKey
centry.Monitors = monEndpoints(mons)
cc = append(cc, centry)
}
return formatCsiClusterConfig(cc)
}
// CreateCsiConfigMap creates an empty config map that will be later used
// to provide cluster configuration to ceph-csi. If a config map already
// exists, it will return it.
func CreateCsiConfigMap(namespace string, clientset kubernetes.Interface) (*v1.ConfigMap, error) {
configMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: ConfigName,
Namespace: namespace,
},
}
configMap.Data = map[string]string{
ConfigKey: "[]",
}
created, err := clientset.CoreV1().ConfigMaps(namespace).Create(configMap)
if err != nil {
if !k8serrors.IsAlreadyExists(err) {
return nil, errors.Wrapf(err, "failed to create initial csi config map %v (in %v)", configMap.Name, namespace)
}
return getCsiConfigMap(namespace, clientset)
}
return created, nil
}
func getCsiConfigMap(namespace string, clientset kubernetes.Interface) (*v1.ConfigMap, error) {
found, err := clientset.CoreV1().ConfigMaps(namespace).Get(ConfigName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to get pre-existing csi config map %q (in %q)",
ConfigName, namespace)
}
return found, err
}
func DeleteCsiConfigMap(namespace string, clientset kubernetes.Interface) error {
if err := clientset.CoreV1().ConfigMaps(namespace).Delete(ConfigName, &metav1.DeleteOptions{}); err != nil {
return errors.Wrapf(err, "failed to delete CSI driver configuration and deployments")
}
return nil
}
// SaveClusterConfig updates the config map used to provide ceph-csi with
// basic cluster configuration. The clusterNamespace and clusterInfo are
// used to determine what "cluster" in the config map will be updated and
// and the clusterNamespace value is epxected to match the clusterID
// value that is provided to ceph-csi uses in the storage class.
// The locker l is typically a mutex and is used to prevent the config
// map from being updated for multiple clusters simultaneously.
func SaveClusterConfig(
clientset kubernetes.Interface, clusterNamespace string,
clusterInfo *cephconfig.ClusterInfo, l sync.Locker) error {
if !CSIEnabled() {
return nil
}
l.Lock()
defer l.Unlock()
// csi is deployed into the same namespace as the operator
csiNamespace := os.Getenv(k8sutil.PodNamespaceEnvVar)
if csiNamespace == "" {
return errors.Errorf("namespace value missing for %s", k8sutil.PodNamespaceEnvVar)
}
logger.Debugf("Using %+v for CSI ConfigMap Namespace", csiNamespace)
// fetch current ConfigMap contents
configMap, err := clientset.CoreV1().ConfigMaps(csiNamespace).Get(
ConfigName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to fetch current csi config map")
}
// update ConfigMap contents for current cluster
currData := configMap.Data[ConfigKey]
if currData == "" {
currData = "[]"
}
newData, err := UpdateCsiClusterConfig(
currData, clusterNamespace, clusterInfo.Monitors)
if err != nil {
return errors.Wrapf(err, "failed to update csi config map data")
}
configMap.Data[ConfigKey] = newData
// update ConfigMap with new contents
if _, err := clientset.CoreV1().ConfigMaps(csiNamespace).Update(configMap); err != nil {
return errors.Wrapf(err, "failed to update csi config map")
}
return nil
}