-
Notifications
You must be signed in to change notification settings - Fork 2
/
replication_logic.go
658 lines (578 loc) · 19.4 KB
/
replication_logic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
package logic
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/dsh2dsh/zrepl/daemon/logging/trace"
"github.com/dsh2dsh/zrepl/logger"
"github.com/dsh2dsh/zrepl/replication/driver"
. "github.com/dsh2dsh/zrepl/replication/logic/diff"
"github.com/dsh2dsh/zrepl/replication/logic/pdu"
"github.com/dsh2dsh/zrepl/replication/report"
"github.com/dsh2dsh/zrepl/util/bytecounter"
"github.com/dsh2dsh/zrepl/util/chainlock"
"github.com/dsh2dsh/zrepl/util/semaphore"
"github.com/dsh2dsh/zrepl/zfs"
)
// Endpoint represents one side of the replication.
//
// An endpoint is either in Sender or Receiver mode, represented by the correspondingly
// named interfaces defined in this package.
type Endpoint interface {
// Does not include placeholder filesystems
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
DestroySnapshots(ctx context.Context, req *pdu.DestroySnapshotsReq) (*pdu.DestroySnapshotsRes, error)
WaitForConnectivity(ctx context.Context) error
}
type Sender interface {
Endpoint
// If a non-nil io.ReadCloser is returned, it is guaranteed to be closed before
// any next call to the parent github.com/dsh2dsh/zrepl/replication.Endpoint.
// If the send request is for dry run the io.ReadCloser will be nil
Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.ReadCloser, error)
SendDry(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, error)
SendCompleted(ctx context.Context, r *pdu.SendCompletedReq) (*pdu.SendCompletedRes, error)
ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
}
type Receiver interface {
Endpoint
// Receive sends r and sendStream (the latter containing a ZFS send stream)
// to the parent github.com/dsh2dsh/zrepl/replication.Endpoint.
Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.ReadCloser) (*pdu.ReceiveRes, error)
}
type Planner struct {
sender Sender
receiver Receiver
policy PlannerPolicy
promSecsPerState *prometheus.HistogramVec // labels: state
promBytesReplicated *prometheus.CounterVec // labels: filesystem
}
func (p *Planner) Plan(ctx context.Context) ([]driver.FS, error) {
fss, err := p.doPlanning(ctx)
if err != nil {
return nil, err
}
dfss := make([]driver.FS, len(fss))
for i := range dfss {
dfss[i] = fss[i]
}
return dfss, nil
}
func (p *Planner) WaitForConnectivity(ctx context.Context) error {
var wg sync.WaitGroup
doPing := func(endpoint Endpoint, errOut *error) {
defer wg.Done()
ctx, endTask := trace.WithTaskFromStack(ctx)
defer endTask()
err := endpoint.WaitForConnectivity(ctx)
if err != nil {
*errOut = err
} else {
*errOut = nil
}
}
wg.Add(2)
var senderErr, receiverErr error
go doPing(p.sender, &senderErr)
go doPing(p.receiver, &receiverErr)
wg.Wait()
if senderErr == nil && receiverErr == nil {
return nil
} else if senderErr != nil && receiverErr != nil {
if senderErr.Error() == receiverErr.Error() {
return fmt.Errorf("sender and receiver are not reachable: %s", senderErr.Error())
} else {
return fmt.Errorf("sender and receiver are not reachable:\n sender: %s\n receiver: %s", senderErr, receiverErr)
}
} else {
var side string
var err *error
if senderErr != nil {
side = "sender"
err = &senderErr
} else {
side = "receiver"
err = &receiverErr
}
return fmt.Errorf("%s is not reachable: %s", side, *err)
}
}
type Filesystem struct {
sender Sender
receiver Receiver
policy PlannerPolicy // immutable, it's .ReplicationConfig member is a pointer and copied into messages
Path string // compat
receiverFS, senderFS *pdu.Filesystem // receiverFS may be nil, senderFS never nil
promBytesReplicated prometheus.Counter // compat
sizeEstimateRequestSem *semaphore.S
}
func (f *Filesystem) EqualToPreviousAttempt(other driver.FS) bool {
g, ok := other.(*Filesystem)
if !ok {
return false
}
// TODO: use GUIDs (issued by zrepl, not those from ZFS)
return f.Path == g.Path
}
func (f *Filesystem) PlanFS(ctx context.Context, oneStep bool) ([]driver.Step,
error,
) {
steps, err := f.doPlanning(ctx, oneStep)
if err != nil {
return nil, err
}
dsteps := make([]driver.Step, len(steps))
for i := range dsteps {
dsteps[i] = steps[i]
}
return dsteps, nil
}
func (f *Filesystem) ReportInfo() *report.FilesystemInfo {
return &report.FilesystemInfo{Name: f.Path} // FIXME compat name
}
type Step struct {
sender Sender
receiver Receiver
parent *Filesystem
from, to *pdu.FilesystemVersion // from may be nil, indicating full send
resumeToken string // empty means no resume token shall be used
expectedSize uint64 // 0 means no size estimate present / possible
// byteCounter is nil initially, and set later in Step.doReplication
// => concurrent read of that pointer from Step.ReportInfo must be protected
byteCounter bytecounter.ReadCloser
byteCounterMtx chainlock.L
}
func (s *Step) TargetEquals(other driver.Step) bool {
t, ok := other.(*Step)
if !ok {
return false
}
if !s.parent.EqualToPreviousAttempt(t.parent) {
panic("Step interface promise broken: parent filesystems must be same")
}
return s.from.GetGuid() == t.from.GetGuid() &&
s.to.GetGuid() == t.to.GetGuid()
}
func (s *Step) TargetDate() time.Time {
return s.to.SnapshotTime() // FIXME compat name
}
func (s *Step) Step(ctx context.Context) error {
return s.doReplication(ctx)
}
func (s *Step) ReportInfo() *report.StepInfo {
// get current byteCounter value
var byteCounter uint64
s.byteCounterMtx.Lock()
if s.byteCounter != nil {
byteCounter = s.byteCounter.Count()
}
s.byteCounterMtx.Unlock()
from := ""
if s.from != nil {
from = s.from.RelName()
}
return &report.StepInfo{
From: from,
To: s.to.RelName(),
Resumed: s.resumeToken != "",
BytesExpected: s.expectedSize,
BytesReplicated: byteCounter,
}
}
// caller must ensure policy.Validate() == nil
func NewPlanner(secsPerState *prometheus.HistogramVec, bytesReplicated *prometheus.CounterVec, sender Sender, receiver Receiver, policy PlannerPolicy) *Planner {
if err := policy.Validate(); err != nil {
panic(err)
}
return &Planner{
sender: sender,
receiver: receiver,
policy: policy,
promSecsPerState: secsPerState,
promBytesReplicated: bytesReplicated,
}
}
func tryAutoresolveConflict(conflict error, policy ConflictResolution) (path []*pdu.FilesystemVersion, reason error) {
if _, ok := conflict.(*ConflictMostRecentSnapshotAlreadyPresent); ok {
// replicatoin is a no-op
return nil, nil
}
if noCommonAncestor, ok := conflict.(*ConflictNoCommonAncestor); ok {
if len(noCommonAncestor.SortedReceiverVersions) == 0 {
if len(noCommonAncestor.SortedSenderVersions) == 0 {
return nil, fmt.Errorf("no snapshots available on sender side")
}
switch policy.InitialReplication {
case InitialReplicationAutoResolutionMostRecent:
var mostRecentSnap *pdu.FilesystemVersion
for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- {
if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot {
mostRecentSnap = noCommonAncestor.SortedSenderVersions[n]
break
}
}
return []*pdu.FilesystemVersion{nil, mostRecentSnap}, nil
case InitialReplicationAutoResolutionAll:
path = append(path, nil)
for n := 0; n < len(noCommonAncestor.SortedSenderVersions); n++ {
if noCommonAncestor.SortedSenderVersions[n].Type == pdu.FilesystemVersion_Snapshot {
path = append(path, noCommonAncestor.SortedSenderVersions[n])
}
}
return path, nil
case InitialReplicationAutoResolutionFail:
return nil, fmt.Errorf("automatic conflict resolution for initial replication is disabled in config")
default:
panic(fmt.Sprintf("unimplemented: %#v", policy.InitialReplication))
}
}
}
return nil, conflict
}
func (p *Planner) doPlanning(ctx context.Context) ([]*Filesystem, error) {
log := getLogger(ctx)
log.Info("start planning")
slfssres, err := p.sender.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
if err != nil {
log.WithError(err).WithField("errType", fmt.Sprintf("%T", err)).Error("error listing sender filesystems")
return nil, err
}
sfss := slfssres.GetFilesystems()
rlfssres, err := p.receiver.ListFilesystems(ctx, &pdu.ListFilesystemReq{})
if err != nil {
log.WithError(err).WithField("errType", fmt.Sprintf("%T", err)).Error("error listing receiver filesystems")
return nil, err
}
rfss := rlfssres.GetFilesystems()
sizeEstimateRequestSem := semaphore.New(int64(p.policy.SizeEstimationConcurrency))
q := make([]*Filesystem, 0, len(sfss))
for _, fs := range sfss {
var receiverFS *pdu.Filesystem
for _, rfs := range rfss {
if rfs.Path == fs.Path {
receiverFS = rfs
}
}
var ctr prometheus.Counter
if p.promBytesReplicated != nil {
ctr = p.promBytesReplicated.WithLabelValues(fs.Path)
}
q = append(q, &Filesystem{
sender: p.sender,
receiver: p.receiver,
policy: p.policy,
Path: fs.Path,
senderFS: fs,
receiverFS: receiverFS,
promBytesReplicated: ctr,
sizeEstimateRequestSem: sizeEstimateRequestSem,
})
}
return q, nil
}
func (fs *Filesystem) doPlanning(ctx context.Context, oneStep bool) ([]*Step,
error,
) {
log := func(ctx context.Context) logger.Logger {
return getLogger(ctx).WithField("filesystem", fs.Path)
}
log(ctx).Debug("assessing filesystem")
sfsvsres, err := fs.sender.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path})
if err != nil {
log(ctx).WithError(err).Error("cannot get remote filesystem versions")
return nil, err
}
sfsvs := sfsvsres.GetVersions()
if len(sfsvs) < 1 {
err := errors.New("sender does not have any versions")
log(ctx).Error(err.Error())
return nil, err
}
var rfsvs []*pdu.FilesystemVersion
if fs.receiverFS != nil && !fs.receiverFS.GetIsPlaceholder() {
rfsvsres, err := fs.receiver.ListFilesystemVersions(ctx, &pdu.ListFilesystemVersionsReq{Filesystem: fs.Path})
if err != nil {
log(ctx).WithError(err).Error("receiver error")
return nil, err
}
rfsvs = rfsvsres.GetVersions()
} else {
rfsvs = []*pdu.FilesystemVersion{}
}
var resumeToken *zfs.ResumeToken
var resumeTokenRaw string
if fs.receiverFS != nil && fs.receiverFS.ResumeToken != "" {
resumeTokenRaw = fs.receiverFS.ResumeToken // shadow
log(ctx).WithField("receiverFS.ResumeToken", resumeTokenRaw).Debug("decode receiver fs resume token")
resumeToken, err = zfs.ParseResumeToken(ctx, resumeTokenRaw) // shadow
if err != nil {
// TODO in theory, we could do replication without resume token, but that would mean that
// we need to discard the resumable state on the receiver's side.
// Would be easy by setting UsedResumeToken=false in the RecvReq ...
// FIXME / CHECK semantics UsedResumeToken if SendReq.ResumeToken == ""
log(ctx).WithError(err).Error("cannot decode resume token, aborting")
return nil, err
}
log(ctx).WithField("token", resumeToken).Debug("decode resume token")
}
var steps []*Step
// build the list of replication steps
//
// prefer to resume any started replication instead of starting over with a normal IncrementalPath
//
// look for the step encoded in the resume token in the sender's version
// if we find that step:
// 1. use it as first step (including resume token)
// 2. compute subsequent steps by computing incremental path from the token.To version on
// ...
// that's actually equivalent to simply cutting off earlier versions from rfsvs and sfsvs
if resumeToken != nil {
sfsvs := SortVersionListByCreateTXGThenBookmarkLTSnapshot(sfsvs)
var fromVersion, toVersion *pdu.FilesystemVersion
var toVersionIdx int
for idx, sfsv := range sfsvs {
if resumeToken.HasFromGUID && sfsv.Guid == resumeToken.FromGUID {
if fromVersion != nil && fromVersion.Type == pdu.FilesystemVersion_Snapshot {
// prefer snapshots over bookmarks for size estimation
} else {
fromVersion = sfsv
}
}
if resumeToken.HasToGUID && sfsv.Guid == resumeToken.ToGUID && sfsv.Type == pdu.FilesystemVersion_Snapshot {
// `toversion` must always be a snapshot
toVersion, toVersionIdx = sfsv, idx
}
}
if toVersion == nil {
return nil, fmt.Errorf("resume token `toguid` = %v not found on sender (`toname` = %q)", resumeToken.ToGUID, resumeToken.ToName)
} else if fromVersion == toVersion {
return nil, fmt.Errorf("resume token `fromguid` and `toguid` match same version on sener")
}
// fromVersion may be nil, toVersion is no nil, encryption matches
// good to go this one step!
resumeStep := &Step{
parent: fs,
sender: fs.sender,
receiver: fs.receiver,
from: fromVersion,
to: toVersion,
resumeToken: resumeTokenRaw,
}
// by definition, the resume token _must_ be the receiver's most recent version, if they have any
// don't bother checking, zfs recv will produce an error if above assumption is wrong
//
// thus, subsequent steps are just incrementals on the sender's remaining _snapshots_ (not bookmarks)
var remainingSFSVs []*pdu.FilesystemVersion
for _, sfsv := range sfsvs[toVersionIdx:] {
if sfsv.Type == pdu.FilesystemVersion_Snapshot {
remainingSFSVs = append(remainingSFSVs, sfsv)
}
}
if oneStep && len(remainingSFSVs) > 1 {
steps = []*Step{resumeStep, {
parent: fs,
sender: fs.sender,
receiver: fs.receiver,
from: remainingSFSVs[0],
to: remainingSFSVs[len(remainingSFSVs)-1],
}}
} else {
steps = make([]*Step, 0, len(remainingSFSVs)) // shadow
steps = append(steps, resumeStep)
for i := 0; i < len(remainingSFSVs)-1; i++ {
steps = append(steps, &Step{
parent: fs,
sender: fs.sender,
receiver: fs.receiver,
from: remainingSFSVs[i],
to: remainingSFSVs[i+1],
})
}
}
} else { // resumeToken == nil
path, conflict := IncrementalPath(rfsvs, sfsvs)
if conflict != nil {
updPath, updConflict := tryAutoresolveConflict(conflict,
*fs.policy.ConflictResolution)
if updConflict == nil {
log(ctx).WithField("conflict", conflict).Info(
"conflict automatically resolved")
} else {
log(ctx).WithField("conflict", conflict).Error(
"cannot resolve conflict")
}
path, conflict = updPath, updConflict
}
if conflict != nil {
return nil, conflict
}
switch {
case len(path) == 0:
steps = nil
case len(path) == 1:
panic(fmt.Sprintf("len(path) must be two for incremental repl, and initial repl must start with nil, got path[0]=%#v", path[0]))
default:
steps = make([]*Step, 0, len(path)) // shadow
for i := 0; i < len(path)-1; i++ {
step := &Step{
parent: fs,
sender: fs.sender,
receiver: fs.receiver,
from: path[i], // nil in case of initial repl
to: path[i+1],
}
steps = append(steps, step)
if step.from.Type == pdu.FilesystemVersion_Snapshot && oneStep {
step.to = path[len(path)-1]
break
}
}
}
}
if len(steps) == 0 {
log(ctx).Info("planning determined that no replication steps are required")
}
log(ctx).Debug("compute send size estimate")
errs := make(chan error, len(steps))
fanOutCtx, fanOutCancel := context.WithCancel(ctx)
taskCtx, fanOutAdd, fanOutWait := trace.WithTaskGroup(fanOutCtx,
"compute-size-estimate")
defer fanOutCancel()
for _, step := range steps {
// TODO: instead of the semaphore, rely on resource-exhaustion signaled by
// the remote endpoint to limit size-estimate requests. Send is handled
// over rpc/dataconn ATM, which doesn't support the resource exhaustion
// status codes that gRPC defines.
guard, err := fs.sizeEstimateRequestSem.Acquire(taskCtx)
if err != nil {
fanOutCancel()
break
}
fanOutAdd(func(ctx context.Context) {
defer guard.Release()
err := step.updateSizeEstimate(ctx)
if err != nil {
log(ctx).WithError(err).WithField("step", step).Error(
"error computing size estimate")
fanOutCancel()
}
errs <- err
})
}
fanOutWait()
close(errs)
var significantErr error = nil
for err := range errs {
if err != nil {
if significantErr == nil || significantErr == context.Canceled {
significantErr = err
}
}
}
if significantErr != nil {
return nil, significantErr
}
log(ctx).Debug("filesystem planning finished")
return steps, nil
}
func (s *Step) updateSizeEstimate(ctx context.Context) error {
sr := s.buildSendRequest()
log := getLogger(ctx)
log.Debug("initiate dry run send request")
sres, err := s.sender.SendDry(ctx, sr)
if err != nil {
log.WithError(err).Error("dry run send request failed")
return err
} else if sres == nil {
err := fmt.Errorf("got nil send result")
log.WithError(err).Error("dry run send request failed")
return err
}
s.expectedSize = sres.GetExpectedSize()
return nil
}
func (s *Step) buildSendRequest() (sr *pdu.SendReq) {
fs := s.parent.Path
sr = &pdu.SendReq{
Filesystem: fs,
From: s.from, // may be nil
To: s.to,
ResumeToken: s.resumeToken,
ReplicationConfig: s.parent.policy.ReplicationConfig,
}
return sr
}
func (s *Step) doReplication(ctx context.Context) error {
fs := s.parent.Path
log := getLogger(ctx).WithField("filesystem", fs)
sr := s.buildSendRequest()
log.Debug("initiate send request")
sres, stream, err := s.sender.Send(ctx, sr)
if err != nil {
log.WithError(err).Error("send request failed")
return err
}
if sres == nil {
err := fmt.Errorf("send request returned nil send result")
log.Error(err.Error())
return err
}
if stream == nil {
err := errors.New("send request did not return a stream, broken endpoint implementation")
return err
}
defer stream.Close()
// Install a byte counter to track progress + for status report
byteCountingStream := bytecounter.NewReadCloser(stream)
s.byteCounterMtx.Lock()
s.byteCounter = byteCountingStream
s.byteCounterMtx.Unlock()
defer func() {
defer s.byteCounterMtx.Lock().Unlock()
if s.parent.promBytesReplicated != nil {
s.parent.promBytesReplicated.Add(float64(s.byteCounter.Count()))
}
}()
rr := &pdu.ReceiveReq{
Filesystem: fs,
To: sr.GetTo(),
ClearResumeToken: !sres.UsedResumeToken,
ReplicationConfig: s.parent.policy.ReplicationConfig,
}
log.Debug("initiate receive request")
_, err = s.receiver.Receive(ctx, rr, byteCountingStream)
if err != nil {
log.
WithError(err).
WithField("errType", fmt.Sprintf("%T", err)).
WithField("rr", fmt.Sprintf("%v", rr)).
Error("receive request failed (might also be error on sender)")
// This failure could be due to
// - an unexpected exit of ZFS on the sending side
// - an unexpected exit of ZFS on the receiving side
// - a connectivity issue
return err
}
log.Debug("receive finished")
log.Debug("tell sender replication completed")
_, err = s.sender.SendCompleted(ctx, &pdu.SendCompletedReq{
OriginalReq: sr,
})
if err != nil {
log.WithError(err).Error("error telling sender that replication completed successfully")
return err
}
return err
}
func (s *Step) String() string {
if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send
return fmt.Sprintf("%s%s (full)", s.parent.Path, s.to.RelName())
} else {
return fmt.Sprintf("%s(%s => %s)", s.parent.Path, s.from.RelName(), s.to.RelName())
}
}