-
Notifications
You must be signed in to change notification settings - Fork 90
/
resize_pvc.go
284 lines (234 loc) · 10 KB
/
resize_pvc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/*
Copyright 2021 The Cockroach Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actor
import (
"context"
"fmt"
"time"
"github.com/cenkalti/backoff"
"github.com/cockroachdb/cockroach-operator/pkg/features"
"github.com/cockroachdb/cockroach-operator/pkg/kube"
"github.com/cockroachdb/cockroach-operator/pkg/utilfeature"
api "github.com/cockroachdb/cockroach-operator/apis/v1alpha1"
"github.com/cockroachdb/cockroach-operator/pkg/condition"
"github.com/cockroachdb/cockroach-operator/pkg/resource"
"github.com/cockroachdb/cockroach-operator/pkg/update"
"github.com/cockroachdb/errors"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// backoffFactory is a replacable global for backoff creation. It may be
// replaced with shorter times to allow testing of Wait___ functions without
// waiting the entire default period
var backoffFactory = defaultBackoffFactory
func defaultBackoffFactory(maxTime time.Duration) backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = maxTime
return b
}
// newResizePVC creates and returns a new resizePVC struct
func newResizePVC(scheme *runtime.Scheme, cl client.Client, config *rest.Config) Actor {
return &resizePVC{
action: newAction("resize_pvc", scheme, cl),
config: config,
}
}
// resizePVC resizes a PVC
type resizePVC struct {
action
config *rest.Config
}
//GetActionType returns api.RequestCertAction action used to set the cluster status errors
func (rp *resizePVC) GetActionType() api.ActionType {
return api.ResizePVCAction
}
// Handles returns true if the DB is initialized
func (rp *resizePVC) Handles(conds []api.ClusterCondition) bool {
return condition.True(api.InitializedCondition, conds) && utilfeature.DefaultMutableFeatureGate.Enabled(features.ResizePVC)
}
// Act in this implementation resizes PVC volumes of a CR sts.
func (rp *resizePVC) Act(ctx context.Context, cluster *resource.Cluster) error {
log := rp.log.WithValues("CrdbCluster", cluster.ObjectKey())
// If we do not have a volume claim we do not have PVCs
if cluster.Spec().DataStore.VolumeClaim == nil {
log.Info("Skipping PVC resize as VolumeClaim does not exist")
return nil
}
// Get the sts and compare the sts size to the size in the CR
key := kubetypes.NamespacedName{
Namespace: cluster.Namespace(),
Name: cluster.StatefulSetName(),
}
statefulSet := &appsv1.StatefulSet{}
if err := rp.client.Get(ctx, key, statefulSet); err != nil {
return errors.Wrap(err, "failed to fetch statefulset")
}
// TODO statefulSetIsUpdating is not quite working as expected.
// I had to check status. We should look at the update code in partition update to address this
if statefulSetIsUpdating(statefulSet) {
return NotReadyErr{Err: errors.New("resize statefulset is updating, waiting for the update to finish")}
}
status := &statefulSet.Status
if status.CurrentReplicas == 0 || status.CurrentReplicas < status.Replicas {
log.Info("resize pvc statefulset does not have all replicas up")
return NotReadyErr{Err: errors.New("resize pvc statefulset does not have all replicas up")}
}
// Maybe this should be an error since we should not have this, but I wanted to check anyways
if len(statefulSet.Spec.VolumeClaimTemplates) == 0 {
log.Info("Skipping PVC resize as PVCs do not exist")
return nil
}
clientset, err := kubernetes.NewForConfig(rp.config)
if err != nil {
return errors.Wrapf(err, "failed to create kubernetes clientset")
}
stsStorageSizeDeployed := statefulSet.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests.Storage()
stsStorageSizeSet := cluster.Spec().DataStore.VolumeClaim.PersistentVolumeClaimSpec.Resources.Requests.Storage()
// If the sizes match do not resize
if stsStorageSizeDeployed.Equal(stsStorageSizeSet.DeepCopy()) {
log.Info("Skipping PVC resize as sizes match")
return nil
}
log.Info("Starting PVC resize")
// Find all of the PVCs and resize them
if err := rp.findAndResizePVC(ctx, statefulSet, cluster, clientset); err != nil {
return errors.Wrapf(err, "updating PVCs for statefulset %s.%s", cluster.Namespace(), cluster.StatefulSetName())
}
log.Info("Starting updating sts")
// Update the STS with the correct volume size, in case more pods are created
// We will create a copy and update the copy, and then delete the original without
// deleting the Pods. The new sts is then used to create a new statefulset.
if err := rp.updateSts(ctx, statefulSet, cluster); err != nil {
return errors.Wrapf(err, "updating statefulset %s.%s", cluster.Namespace(), cluster.StatefulSetName())
}
// TODO this is not working so we will need to patch the sts
// with a value that will force a restart
// We are thinking that patching an annotation will help
/*
if !cluster.Spec().DataStore.SupportsAutoResize {
log.Info("Starting rolling sts")
// Roll the entire STS in order for the Pods to resize
if err := rp.rollSts(ctx, cluster, clientset); err != nil {
return errors.Wrapf(err, "error restarting statefulset %s.%s", cluster.Namespace(), cluster.StatefulSetName())
}
} else {
log.Info("Volumes support autoresizing so not restarting STS Pods")
}*/
log.Info("PVC resize completed")
CancelLoop(ctx)
return nil
}
// updateSts updates the size of an STS' VolumeClaimTemplate to match the new size in the CR.
// In order to update the volume claim template we have to delete the STS without cascading and then
// create the sts.
func (rp *resizePVC) updateSts(ctx context.Context, sts *appsv1.StatefulSet, cluster *resource.Cluster) error {
// delete the original sts, but do not delete the Pods
orphan := metav1.DeletePropagationOrphan
if err := rp.client.Delete(ctx, sts, &client.DeleteOptions{PropagationPolicy: &orphan}); err != nil {
return err
}
f := func() error {
return rp.recreateSTS(ctx, cluster)
}
b := backoffFactory(5 * time.Minute)
return backoff.Retry(f, backoff.WithContext(b, ctx))
}
func (rp *resizePVC) recreateSTS(ctx context.Context, cluster *resource.Cluster) error {
log := rp.log.WithValues("CrdbCluster", cluster.ObjectKey())
// Use same StatefulSetBuilder that we run in Deploy to
// rebuild and save the StatefulSet with the new PVC size
r := resource.NewManagedKubeResource(ctx, rp.client, cluster, kube.AnnotatingPersister)
_, err := (resource.Reconciler{
ManagedResource: r,
Builder: resource.StatefulSetBuilder{
Cluster: cluster,
Selector: r.Labels.Selector(),
},
Owner: cluster.Unwrap(),
Scheme: rp.scheme,
}).Reconcile()
if err != nil {
log.Info("unable to re-create sts, will retry")
}
return err
}
// findAndResizePVC finds all active PVCs and resizes them to the new size contained in the cluster
// definition.
func (rp *resizePVC) findAndResizePVC(ctx context.Context, sts *appsv1.StatefulSet, cluster *resource.Cluster,
clientset *kubernetes.Clientset) error {
// K8s doesn't provide a way to tell if a PVC or PV is currently in use by
// a pod. However, it is safe to assume that any PVCs with an ordinal great
// than or equal to the sts' Replicas is not in use. As only pods with with
// an ordinal < Replicas will exist. Any PVCs with an ordinal less than
// Replicas is in use. To detect this, we build a map of PVCs that we
// consider to be in use and skip and PVCs that it contains
// the name of.
log := rp.log.WithValues("CrdbCluster", cluster.ObjectKey())
log.Info("starting finding and resizing all PVCs")
prefixes := make([]string, len(sts.Spec.VolumeClaimTemplates))
pvcsToKeep := make(map[string]bool, int(*sts.Spec.Replicas)*len(sts.Spec.VolumeClaimTemplates))
for j, pvct := range sts.Spec.VolumeClaimTemplates {
prefixes[j] = fmt.Sprintf("%s-%s-", pvct.Name, sts.Name)
for i := int32(0); i < *sts.Spec.Replicas; i++ {
name := fmt.Sprintf("%s-%s-%d", pvct.Name, sts.Name, i)
pvcsToKeep[name] = true
}
}
selector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector)
if err != nil {
return errors.Wrap(err, "converting statefulset selector to metav1 selector")
}
pvcs, err := clientset.CoreV1().PersistentVolumeClaims(cluster.Namespace()).List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return errors.Wrap(err, "finding PVCs to for resizing")
}
log.Info("resizing PVCs")
for _, pvc := range pvcs.Items {
// Resize PVCs that are still in use
if pvcsToKeep[pvc.Name] {
size := cluster.Spec().DataStore.VolumeClaim.PersistentVolumeClaimSpec.Resources.Requests.Storage()
pvc.Spec.Resources.Requests[v1.ResourceStorage] = *size
if _, err := clientset.CoreV1().PersistentVolumeClaims(cluster.Namespace()).Update(ctx, &pvc, metav1.UpdateOptions{}); err != nil {
return errors.Wrap(err, "error resizing PVCs")
}
log.Info(fmt.Sprintf("resized %s", pvc.Name))
}
}
log.Info("found and resized all PVCs")
return nil
}
// rollSts performs a rolling update on the cluster.
func (rp *resizePVC) rollSts(ctx context.Context, cluster *resource.Cluster, clientset *kubernetes.Clientset) error {
updateRoach := &update.UpdateRoach{
StsName: cluster.StatefulSetName(),
StsNamespace: cluster.Namespace(),
}
podUpdateTimeout := 10 * time.Minute
podMaxPollingInterval := 30 * time.Minute
sleeper := update.NewSleeper(1 * time.Minute)
k8sCluster := &update.UpdateCluster{
Clientset: clientset,
PodUpdateTimeout: podUpdateTimeout,
PodMaxPollingInterval: podMaxPollingInterval,
Sleeper: sleeper,
}
return update.RollingRestart(ctx, updateRoach, k8sCluster, rp.log)
}