forked from vmware-tanzu/velero
/
item_backupper.go
339 lines (279 loc) · 10.3 KB
/
item_backupper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
Copyright 2017 the Heptio Ark contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package backup
import (
"archive/tar"
"encoding/json"
"path/filepath"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
api "github.com/heptio/ark/pkg/apis/ark/v1"
"github.com/heptio/ark/pkg/client"
"github.com/heptio/ark/pkg/cloudprovider"
"github.com/heptio/ark/pkg/discovery"
"github.com/heptio/ark/pkg/util/collections"
"github.com/heptio/ark/pkg/util/logging"
)
type itemBackupperFactory interface {
newItemBackupper(
backup *api.Backup,
namespaces, resources *collections.IncludesExcludes,
backedUpItems map[itemKey]struct{},
actions []resolvedAction,
podCommandExecutor podCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
snapshotService cloudprovider.SnapshotService,
) ItemBackupper
}
type defaultItemBackupperFactory struct{}
func (f *defaultItemBackupperFactory) newItemBackupper(
backup *api.Backup,
namespaces, resources *collections.IncludesExcludes,
backedUpItems map[itemKey]struct{},
actions []resolvedAction,
podCommandExecutor podCommandExecutor,
tarWriter tarWriter,
resourceHooks []resourceHook,
dynamicFactory client.DynamicFactory,
discoveryHelper discovery.Helper,
snapshotService cloudprovider.SnapshotService,
) ItemBackupper {
ib := &defaultItemBackupper{
backup: backup,
namespaces: namespaces,
resources: resources,
backedUpItems: backedUpItems,
actions: actions,
tarWriter: tarWriter,
resourceHooks: resourceHooks,
dynamicFactory: dynamicFactory,
discoveryHelper: discoveryHelper,
snapshotService: snapshotService,
itemHookHandler: &defaultItemHookHandler{
podCommandExecutor: podCommandExecutor,
},
}
// this is for testing purposes
ib.additionalItemBackupper = ib
return ib
}
type ItemBackupper interface {
backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource) error
}
type defaultItemBackupper struct {
backup *api.Backup
namespaces *collections.IncludesExcludes
resources *collections.IncludesExcludes
backedUpItems map[itemKey]struct{}
actions []resolvedAction
tarWriter tarWriter
resourceHooks []resourceHook
dynamicFactory client.DynamicFactory
discoveryHelper discovery.Helper
snapshotService cloudprovider.SnapshotService
itemHookHandler itemHookHandler
additionalItemBackupper ItemBackupper
}
var podsGroupResource = schema.GroupResource{Group: "", Resource: "pods"}
var namespacesGroupResource = schema.GroupResource{Group: "", Resource: "namespaces"}
// backupItem backs up an individual item to tarWriter. The item may be excluded based on the
// namespaces IncludesExcludes list.
func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstructured, groupResource schema.GroupResource) error {
metadata, err := meta.Accessor(obj)
if err != nil {
return err
}
namespace := metadata.GetNamespace()
name := metadata.GetName()
log := logger.WithField("name", name)
if namespace != "" {
log = log.WithField("namespace", namespace)
}
// NOTE: we have to re-check namespace & resource includes/excludes because it's possible that
// backupItem can be invoked by a custom action.
if namespace != "" && !ib.namespaces.ShouldInclude(namespace) {
log.Info("Excluding item because namespace is excluded")
return nil
}
// NOTE: we specifically allow namespaces to be backed up even if IncludeClusterResources is
// false.
if namespace == "" && groupResource != namespacesGroupResource && ib.backup.Spec.IncludeClusterResources != nil && !*ib.backup.Spec.IncludeClusterResources {
log.Info("Excluding item because resource is cluster-scoped and backup.spec.includeClusterResources is false")
return nil
}
if !ib.resources.ShouldInclude(groupResource.String()) {
log.Info("Excluding item because resource is excluded")
return nil
}
key := itemKey{
resource: groupResource.String(),
namespace: namespace,
name: name,
}
if _, exists := ib.backedUpItems[key]; exists {
log.Info("Skipping item because it's already been backed up.")
return nil
}
ib.backedUpItems[key] = struct{}{}
log.Info("Backing up resource")
// Never save status
delete(obj.UnstructuredContent(), "status")
log.Info("Executing pre hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePre); err != nil {
return err
}
for _, action := range ib.actions {
if !action.resourceIncludesExcludes.ShouldInclude(groupResource.String()) {
log.Debug("Skipping action because it does not apply to this resource")
continue
}
if namespace != "" && !action.namespaceIncludesExcludes.ShouldInclude(namespace) {
log.Debug("Skipping action because it does not apply to this namespace")
continue
}
if !action.selector.Matches(labels.Set(metadata.GetLabels())) {
log.Debug("Skipping action because label selector does not match")
continue
}
log.Info("Executing custom action")
if logSetter, ok := action.ItemAction.(logging.LogSetter); ok {
logSetter.SetLog(log)
}
if updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backup); err == nil {
obj = updatedItem
for _, additionalItem := range additionalItemIdentifiers {
gvr, resource, err := ib.discoveryHelper.ResourceFor(additionalItem.GroupResource.WithVersion(""))
if err != nil {
return err
}
client, err := ib.dynamicFactory.ClientForGroupVersionResource(gvr.GroupVersion(), resource, additionalItem.Namespace)
if err != nil {
return err
}
additionalItem, err := client.Get(additionalItem.Name, metav1.GetOptions{})
if err != nil {
return err
}
ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource())
}
} else {
// We want this to show up in the log file at the place where the error occurs. When we return
// the error, it get aggregated with all the other ones at the end of the backup, making it
// harder to tell when it happened.
log.WithError(err).Error("error executing custom action")
return errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}
}
if groupResource == pvGroupResource {
if ib.snapshotService == nil {
log.Debug("Skipping Persistent Volume snapshot because they're not enabled.")
} else {
if err := ib.takePVSnapshot(obj, ib.backup, log); err != nil {
return err
}
}
}
log.Info("Executing post hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.resourceHooks, hookPhasePost); err != nil {
return err
}
var filePath string
if namespace != "" {
filePath = filepath.Join(api.ResourcesDir, groupResource.String(), api.NamespaceScopedDir, namespace, name+".json")
} else {
filePath = filepath.Join(api.ResourcesDir, groupResource.String(), api.ClusterScopedDir, name+".json")
}
itemBytes, err := json.Marshal(obj.UnstructuredContent())
if err != nil {
return errors.WithStack(err)
}
hdr := &tar.Header{
Name: filePath,
Size: int64(len(itemBytes)),
Typeflag: tar.TypeReg,
Mode: 0755,
ModTime: time.Now(),
}
if err := ib.tarWriter.WriteHeader(hdr); err != nil {
return errors.WithStack(err)
}
if _, err := ib.tarWriter.Write(itemBytes); err != nil {
return errors.WithStack(err)
}
return nil
}
// zoneLabel is the label that stores availability-zone info
// on PVs
const zoneLabel = "failure-domain.beta.kubernetes.io/zone"
// takePVSnapshot triggers a snapshot for the volume/disk underlying a PersistentVolume if the provided
// backup has volume snapshots enabled and the PV is of a compatible type. Also records cloud
// disk type and IOPS (if applicable) to be able to restore to current state later.
func (ib *defaultItemBackupper) takePVSnapshot(pv runtime.Unstructured, backup *api.Backup, log logrus.FieldLogger) error {
log.Info("Executing takePVSnapshot")
if backup.Spec.SnapshotVolumes != nil && !*backup.Spec.SnapshotVolumes {
log.Info("Backup has volume snapshots disabled; skipping volume snapshot action.")
return nil
}
metadata, err := meta.Accessor(pv)
if err != nil {
return errors.WithStack(err)
}
name := metadata.GetName()
var pvFailureDomainZone string
labels := metadata.GetLabels()
if labels[zoneLabel] != "" {
pvFailureDomainZone = labels[zoneLabel]
} else {
log.Infof("label %q is not present on PersistentVolume", zoneLabel)
}
volumeID, err := ib.snapshotService.GetVolumeID(pv)
if err != nil {
return errors.Wrapf(err, "error getting volume ID for PersistentVolume")
}
if volumeID == "" {
log.Info("PersistentVolume is not a supported volume type for snapshots, skipping.")
return nil
}
log = log.WithField("volumeID", volumeID)
log.Info("Snapshotting PersistentVolume")
snapshotID, err := ib.snapshotService.CreateSnapshot(volumeID, pvFailureDomainZone)
if err != nil {
// log+error on purpose - log goes to the per-backup log file, error goes to the backup
log.WithError(err).Error("error creating snapshot")
return errors.WithMessage(err, "error creating snapshot")
}
volumeType, iops, err := ib.snapshotService.GetVolumeInfo(volumeID, pvFailureDomainZone)
if err != nil {
log.WithError(err).Error("error getting volume info")
return errors.WithMessage(err, "error getting volume info")
}
if backup.Status.VolumeBackups == nil {
backup.Status.VolumeBackups = make(map[string]*api.VolumeBackupInfo)
}
backup.Status.VolumeBackups[name] = &api.VolumeBackupInfo{
SnapshotID: snapshotID,
Type: volumeType,
Iops: iops,
AvailabilityZone: pvFailureDomainZone,
}
return nil
}