/
persistent_volume_pruner.go
264 lines (231 loc) · 9.01 KB
/
persistent_volume_pruner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
/*
Copyright 2021 The Cockroach Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scale
import (
"context"
"fmt"
"sort"
"strings"
"github.com/cockroachdb/errors"
"github.com/go-logr/logr"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
// PersistentVolumePruner provides a .Prune method to remove unused statefulset
// PVC and their underlying PVs. The underlying PVs SHOULD have their reclaim
// policy set to delete.
type PersistentVolumePruner struct {
Namespace string
StatefulSet string
ClientSet kubernetes.Interface
Logger logr.Logger
}
// watchStatefulset establishing a watch on the given statefulset in a
// goroutine and will call the provided cancel function whenever a modification
// to the .Spec.Replicas field OR an unexpected (non-modification) event is
// observed.
// It may be used to detect concurrent modification to a statefulset when a
// multi-step operation is taking place that depends on .Spec.Replicas staying
// the same for every step.
// watchStatefulset does not block and relies on the context being cancelled to
// prevent leakage of goroutines.
func (p *PersistentVolumePruner) watchStatefulset(
ctx context.Context,
cancel context.CancelFunc,
sts *appsv1.StatefulSet,
) error {
w, err := p.ClientSet.AppsV1().StatefulSets(p.Namespace).Watch(ctx, metav1.SingleObject(sts.ObjectMeta))
if err != nil {
return errors.Wrapf(err, "establishing watch on statefulset %s.%s", p.Namespace, p.StatefulSet)
}
p.Logger.V(int(zapcore.InfoLevel)).Info("established statefulset watch", "name", p.StatefulSet, "namespace", p.Namespace)
go func() {
defer w.Stop()
for {
// First, select without our result channel as an option. if ctx is
// cancelled and watch is closed (happens in tests mostly) we'll
// generate some log spam on zero events.
select {
case <-ctx.Done():
return
default:
}
select {
// NOTE: once cancel() has been called, we'll hit this case due to
// the for loop, which will prevent goroutines from leaking.
case <-ctx.Done():
return
case evt := <-w.ResultChan():
switch evt.Type {
case watch.Modified:
if modified, ok := evt.Object.(*appsv1.StatefulSet); ok {
// Only cancel if Replicas has changed. If an update
// happens while pruning, it's still safe to run.
// Technically, it's safe to continue if Replicas
// decreases. However any change to replicas is
// unexpected so we'll err on the side of caution for
// now.
if modified.Spec.Replicas == nil || *modified.Spec.Replicas != *sts.Spec.Replicas {
cancel()
}
}
default:
// cancel on any unexpected events.
p.Logger.V(int(zapcore.WarnLevel)).Info("saw an unexpected event", "event", evt)
cancel()
}
}
}
}()
return nil
}
// pvcsToDelete locates all PVCs that were provisioned for the given
// statefulset but are not currently in use. Use is defined as having an
// ordinal that is less than the number of expected replica for the given
// statefulset.
func (p *PersistentVolumePruner) pvcsToDelete(ctx context.Context, sts *appsv1.StatefulSet) ([]corev1.PersistentVolumeClaim, error) {
// K8s doesn't provide a way to tell if a PVC or PV is currently in use by
// a pod. However, it is safe to assume that any PVCs with an ordinal great
// than or equal to the sts' Replicas is not in use. As only pods with with
// an ordinal < Replicas will exist. Any PVCs with an ordinal less than
// Replicas is in use. To detect this, we build a map of PVCs that we
// consider to be in use and skip and PVCs that it contains
// the name of.
prefixes := make([]string, len(sts.Spec.VolumeClaimTemplates))
pvcsToKeep := make(map[string]bool, int(*sts.Spec.Replicas)*len(sts.Spec.VolumeClaimTemplates))
for j, pvct := range sts.Spec.VolumeClaimTemplates {
prefixes[j] = fmt.Sprintf("%s-%s-", pvct.Name, sts.Name)
for i := int32(0); i < *sts.Spec.Replicas; i++ {
name := fmt.Sprintf("%s-%s-%d", pvct.Name, sts.Name, i)
pvcsToKeep[name] = true
}
}
selector, err := metav1.LabelSelectorAsSelector(sts.Spec.Selector)
if err != nil {
return nil, errors.Wrap(err, "converting statefulset selector to metav1 selector")
}
pvcs, err := p.ClientSet.CoreV1().PersistentVolumeClaims(p.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return nil, errors.Wrap(err, "listing PVCs to consider deleting")
}
i := 0
for _, pvc := range pvcs.Items {
// Don't delete PVCs that are still in use.
if pvcsToKeep[pvc.Name] {
continue
}
// Ensure that any PVC we consider deleting matches the expected naming
// convention for PVCs managed by a statefulset.
// <mount name>-<sts name>-<ordinal>
matched := false
for _, prefix := range prefixes {
if strings.HasPrefix(pvc.Name, prefix) {
matched = true
break
}
}
if !matched {
continue
}
pvcs.Items[i] = pvc
i++
}
// Filter out any pvcs that are in use OR don't start with one of our
// expected prefixes.
pvcs.Items = pvcs.Items[:i]
// Lexically sort pvcs to ensure we're deleting from lowest to highest.
// This isn't incredibly important but may save us from some race
// conditions. PVCs will be provisioned/reused from lowest to highest. If a
// new replica is created while we're pruning and we can't detect it or
// detect it fast enough, we'll _hopefully_ the requested PVCs will be
// deleting forcing a new one to be created.
// This is not a guarantee of any kind, just hedging our bets.
// However, it is still unexpected that this operation would happen
// concurrently due to our coarse grain cluster locking.
sort.Slice(pvcs.Items, func(i, j int) bool {
return pvcs.Items[i].Name < pvcs.Items[j].Name
})
return pvcs.Items, nil
}
// Prune locates and removes all PVCs that belong to a given statefulset but
// are not in use. Use is determined by the .Spec.Replicas field on the
// statefulset and the PVCs' ordinal. Prune will return an error if unexpected
// PVCs are encountered (conflicting labels) or the referenced statefulset's
// .Spec.Replicas field changes will this operation is running.
// The underlying PVs' reclaim policy should be set to delete, other options
// may result in leaking volumes which cost us money.
func (p *PersistentVolumePruner) Prune(ctx context.Context) error {
sts, err := p.ClientSet.AppsV1().StatefulSets(p.Namespace).Get(
ctx,
p.StatefulSet,
metav1.GetOptions{},
)
if err != nil {
return errors.Wrapf(err, "getting statefulset %s.%s", p.Namespace, p.StatefulSet)
}
// Sanity that we can deference Replicas without panicking.
if sts.Spec.Replicas == nil {
return errors.New("statefulset had nil .Replicas")
}
// TODO we should pass in the controller context here
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// watchStatefulset will cancel our ctx if it detects any modifications to
// sts.Spec.Replicas OR on any unexpected events, namely Deletions.
if err := p.watchStatefulset(ctx, cancel, sts); err != nil {
return errors.Wrap(err, "setting up statefulset watcher")
}
pvcs, err := p.pvcsToDelete(ctx, sts)
if err != nil {
return errors.Wrap(err, "finding pvcs to prune")
}
// 60 seconds was picked arbitrarily
gracePeriod := int64(60)
propagationPolicy := metav1.DeletePropagationForeground
for _, pvc := range pvcs {
// Ensure that our context is still active. It will be canceled if a
// change to sts.Spec.Replicas is detected.
select {
case <-ctx.Done():
return errors.New("concurrent statefulset modification detected")
default:
}
p.Logger.V(int(zapcore.DebugLevel)).Info("deleting PVC", "name", pvc.Name)
if err := p.ClientSet.CoreV1().PersistentVolumeClaims(p.Namespace).Delete(ctx, pvc.Name, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
// Wait for the underlying PV to be deleted before moving on to
// the next volume.
PropagationPolicy: &propagationPolicy,
Preconditions: &metav1.Preconditions{
// Ensure that this PVC is the same PVC that we slated for
// deletion. If for some reason there are concurrent scale jobs
// running, this will prevent us from re-deleting a PVC that
// was removed and recreated.
UID: &pvc.UID,
// Ensure that this PVC has not changed since we fetched it.
// This check doesn't help very much as a PVC is not actually
// modified when it's mounted to a pod.
ResourceVersion: &pvc.ResourceVersion,
},
}); err != nil {
return errors.Wrapf(err, "delting pvc %s", pvc.Name)
}
}
return nil
}