-
Notifications
You must be signed in to change notification settings - Fork 906
/
migration_volumes.go
324 lines (275 loc) · 11 KB
/
migration_volumes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
package migration
import (
"fmt"
"io"
"net/http"
backupConfig "github.com/canonical/lxd/lxd/backup/config"
"github.com/canonical/lxd/lxd/operations"
"github.com/canonical/lxd/shared"
"github.com/canonical/lxd/shared/api"
"github.com/canonical/lxd/shared/ioprogress"
"github.com/canonical/lxd/shared/units"
)
// Info represents the index frame sent if supported.
type Info struct {
Config *backupConfig.Config `json:"config,omitempty" yaml:"config,omitempty"` // Equivalent of backup.yaml but embedded in index.
}
// InfoResponse represents the response to the index frame sent if supported.
// Right now this doesn't contain anything useful, its just used to indicate receipt of the index header.
// But in the future the itention is to use it allow the target to send back additional information to the source
// about which frames (such as snapshots) it needs for the migration after having inspected the Info index header.
type InfoResponse struct {
StatusCode int
Error string
Refresh *bool // This is used to let the source know whether to actually refresh a volume.
}
// Err returns the error of the response.
func (r *InfoResponse) Err() error {
if r.StatusCode != http.StatusOK {
return api.StatusErrorf(r.StatusCode, r.Error)
}
return nil
}
// Type represents the migration transport type. It indicates the method by which the migration can
// take place and what optional features are available.
type Type struct {
FSType MigrationFSType // Transport mode selected.
Features []string // Feature hints for selected FSType transport mode.
}
// VolumeSourceArgs represents the arguments needed to setup a volume migration source.
type VolumeSourceArgs struct {
IndexHeaderVersion uint32
Name string
Snapshots []string
MigrationType Type
TrackProgress bool
MultiSync bool
FinalSync bool
Data any // Optional store to persist storage driver state between MultiSync phases.
ContentType string
AllowInconsistent bool
Refresh bool
Info *Info
VolumeOnly bool
ClusterMove bool
}
// VolumeTargetArgs represents the arguments needed to setup a volume migration sink.
type VolumeTargetArgs struct {
IndexHeaderVersion uint32
Name string
Description string
Config map[string]string // Only used for custom volume migration.
Snapshots []string
MigrationType Type
TrackProgress bool
Refresh bool
Live bool
VolumeSize int64
ContentType string
VolumeOnly bool
ClusterMoveSourceName string
}
// TypesToHeader converts one or more Types to a MigrationHeader. It uses the first type argument
// supplied to indicate the preferred migration method and sets the MigrationHeader's Fs type
// to that. If the preferred type is ZFS then it will also set the header's optional ZfsFeatures.
// If the fallback Rsync type is present in any of the types even if it is not preferred, then its
// optional features are added to the header's RsyncFeatures, allowing for fallback negotiation to
// take place on the farside.
func TypesToHeader(types ...Type) *MigrationHeader {
missingFeature := false
hasFeature := true
var preferredType Type
if len(types) > 0 {
preferredType = types[0]
}
header := MigrationHeader{Fs: &preferredType.FSType}
// Add ZFS features if preferred type is ZFS.
if preferredType.FSType == MigrationFSType_ZFS {
features := ZfsFeatures{
Compress: &missingFeature,
}
for _, feature := range preferredType.Features {
if feature == "compress" {
features.Compress = &hasFeature
} else if feature == ZFSFeatureMigrationHeader {
features.MigrationHeader = &hasFeature
} else if feature == ZFSFeatureZvolFilesystems {
features.HeaderZvols = &hasFeature
}
}
header.ZfsFeatures = &features
}
// Add BTRFS features if preferred type is BTRFS.
if preferredType.FSType == MigrationFSType_BTRFS {
features := BtrfsFeatures{
MigrationHeader: &missingFeature,
HeaderSubvolumes: &missingFeature,
}
for _, feature := range preferredType.Features {
if feature == BTRFSFeatureMigrationHeader {
features.MigrationHeader = &hasFeature
} else if feature == BTRFSFeatureSubvolumes {
features.HeaderSubvolumes = &hasFeature
} else if feature == BTRFSFeatureSubvolumeUUIDs {
features.HeaderSubvolumeUuids = &hasFeature
}
}
header.BtrfsFeatures = &features
}
// Check all the types for an Rsync method, if found add its features to the header's RsyncFeatures list.
for _, t := range types {
if !shared.ValueInSlice(t.FSType, []MigrationFSType{MigrationFSType_RSYNC, MigrationFSType_BLOCK_AND_RSYNC, MigrationFSType_RBD_AND_RSYNC}) {
continue
}
features := RsyncFeatures{
Xattrs: &missingFeature,
Delete: &missingFeature,
Compress: &missingFeature,
Bidirectional: &missingFeature,
}
for _, feature := range t.Features {
if feature == "xattrs" {
features.Xattrs = &hasFeature
} else if feature == "delete" {
features.Delete = &hasFeature
} else if feature == "compress" {
features.Compress = &hasFeature
} else if feature == "bidirectional" {
features.Bidirectional = &hasFeature
}
}
header.RsyncFeatures = &features
break // Only use the first rsync transport type found to generate rsync features list.
}
return &header
}
// MatchTypes attempts to find matching migration transport types between an offered type sent from a remote
// source and the types supported by a local storage pool. If matches are found then one or more Types are
// returned containing the method and the matching optional features present in both. The function also takes a
// fallback type which is used as an additional offer type preference in case the preferred remote type is not
// compatible with the local type available. It is expected that both sides of the migration will support the
// fallback type for the volume's content type that is being migrated.
func MatchTypes(offer *MigrationHeader, fallbackType MigrationFSType, ourTypes []Type) ([]Type, error) {
// Generate an offer types slice from the preferred type supplied from remote and the
// fallback type supplied based on the content type of the transfer.
offeredFSTypes := []MigrationFSType{offer.GetFs(), fallbackType}
matchedTypes := []Type{}
// Find first matching type.
for _, ourType := range ourTypes {
for _, offerFSType := range offeredFSTypes {
if offerFSType != ourType.FSType {
continue // Not a match, try the next one.
}
// We got a match, now extract the relevant offered features.
var offeredFeatures []string
if offerFSType == MigrationFSType_ZFS {
offeredFeatures = offer.GetZfsFeaturesSlice()
} else if offerFSType == MigrationFSType_BTRFS {
offeredFeatures = offer.GetBtrfsFeaturesSlice()
} else if shared.ValueInSlice(offerFSType, []MigrationFSType{MigrationFSType_RSYNC, MigrationFSType_RBD_AND_RSYNC}) {
// There are other migration types using rsync like MigrationFSType_BLOCK_AND_RSYNC
// for which we cannot set the offered features as an older LXD might ignore those
// if the migration type is not MigrationFSType_RSYNC.
// When both the source and target agree on MigrationFSType_BLOCK_AND_RSYNC
// the rsync portion of the migration type isn't using any features.
// This allows staying backwards compatible with older versions of LXD.
offeredFeatures = offer.GetRsyncFeaturesSlice()
}
// Find common features in both our type and offered type.
commonFeatures := []string{}
for _, ourFeature := range ourType.Features {
if shared.ValueInSlice(ourFeature, offeredFeatures) {
commonFeatures = append(commonFeatures, ourFeature)
}
}
if offer.GetRefresh() {
// Optimized refresh with zfs only works if ZfsFeatureMigrationHeader is available.
if ourType.FSType == MigrationFSType_ZFS && !shared.ValueInSlice(ZFSFeatureMigrationHeader, commonFeatures) {
continue
}
// Optimized refresh with btrfs only works if BtrfsFeatureSubvolumeUUIDs is available.
if ourType.FSType == MigrationFSType_BTRFS && !shared.ValueInSlice(BTRFSFeatureSubvolumeUUIDs, commonFeatures) {
continue
}
}
// Append type with combined features.
matchedTypes = append(matchedTypes, Type{
FSType: ourType.FSType,
Features: commonFeatures,
})
}
}
if len(matchedTypes) < 1 {
// No matching transport type found, generate an error with offered types and our types.
offeredTypeStrings := make([]string, 0, len(offeredFSTypes))
for _, offerFSType := range offeredFSTypes {
offeredTypeStrings = append(offeredTypeStrings, offerFSType.String())
}
ourTypeStrings := make([]string, 0, len(ourTypes))
for _, ourType := range ourTypes {
ourTypeStrings = append(ourTypeStrings, ourType.FSType.String())
}
return matchedTypes, fmt.Errorf("No matching migration types found. Offered types: %v, our types: %v", offeredTypeStrings, ourTypeStrings)
}
return matchedTypes, nil
}
func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
meta := op.Metadata()
if meta == nil {
meta = make(map[string]any)
}
progress := fmt.Sprintf("%s (%s/s)", units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
if description != "" {
progress = fmt.Sprintf("%s: %s (%s/s)", description, units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
}
if meta[key] != progress {
meta[key] = progress
_ = op.UpdateMetadata(meta)
}
}
// ProgressReader reports the read progress.
func ProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
return func(reader io.ReadCloser) io.ReadCloser {
if op == nil {
return reader
}
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
readPipe := &ioprogress.ProgressReader{
ReadCloser: reader,
Tracker: &ioprogress.ProgressTracker{
Handler: progress,
},
}
return readPipe
}
}
// ProgressWriter reports the write progress.
func ProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
return func(writer io.WriteCloser) io.WriteCloser {
if op == nil {
return writer
}
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
writePipe := &ioprogress.ProgressWriter{
WriteCloser: writer,
Tracker: &ioprogress.ProgressTracker{
Handler: progress,
},
}
return writePipe
}
}
// ProgressTracker returns a migration I/O tracker.
func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker {
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
tracker := &ioprogress.ProgressTracker{
Handler: progress,
}
return tracker
}