-
Notifications
You must be signed in to change notification settings - Fork 2
/
retrytransfers.go
126 lines (110 loc) · 3.05 KB
/
retrytransfers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package file
import (
"context"
"strings"
"time"
"github.com/Files-com/files-sdk-go/v3/file/status"
"github.com/Files-com/files-sdk-go/v3/lib/direction"
"github.com/bradfitz/iter"
)
type RetryPolicy struct {
Type RetryPolicyType
RetryCount int
Backoff int
}
func (p RetryPolicy) WaitSec(retry int) time.Duration {
if p.Backoff == 0 {
p.Backoff = 2
}
return time.Second * time.Duration(p.Backoff*retry)
}
type RetryPolicyType string
const (
RetryAll = RetryPolicyType("RetryAll")
RetryUnfinished = RetryPolicyType("RetryUnfinished")
)
func RetryByPolicy(ctx context.Context, job *Job, policy RetryPolicy, signalEvents bool) {
switch policy.Type {
case RetryAll:
RetryByStatus(ctx, job, signalEvents, policy, status.Included...)
case RetryUnfinished:
RetryByStatus(ctx, job, signalEvents, policy, append(status.Running, []status.GetStatus{status.Errored, status.Canceled}...)...)
}
}
func RetryByStatus(ctx context.Context, job *Job, signalEvents bool, policy RetryPolicy, s ...status.GetStatus) {
for i := range iter.N(policy.RetryCount) {
switch job.Direction {
case direction.DownloadType:
retryDownload(ctx, job, signalEvents, s)
case direction.UploadType:
retryUpload(ctx, job, signalEvents, s)
default:
panic("invalid direction")
}
if len(job.Sub(s...).Statuses) > 0 && i+1 != policy.RetryCount {
job.Logger.Printf("retry (%v): backing off %v sec", i+1, policy.WaitSec(i))
time.Sleep(policy.WaitSec(i))
} else {
return
}
}
}
func retryUpload(ctx context.Context, job *Job, signalEvents bool, s []status.GetStatus) {
onComplete := make(chan *UploadStatus)
defer close(onComplete)
enqueueByStatus(ctx, job, signalEvents,
func(s IFile, jobCxt context.Context) {
job.UpdateStatus(status.Retrying, s.(*UploadStatus), nil)
enqueueUpload(jobCxt, job, s.(*UploadStatus), onComplete)
}, func() {
<-onComplete
},
s...,
)
}
func retryDownload(ctx context.Context, job *Job, signalEvents bool, s []status.GetStatus) {
onComplete := make(chan *DownloadStatus)
defer close(onComplete)
enqueueByStatus(ctx, job, signalEvents,
func(s IFile, jobCxt context.Context) {
job.UpdateStatus(status.Retrying, s.(*DownloadStatus), nil)
enqueueDownload(jobCxt, job, s.(*DownloadStatus), onComplete)
}, func() {
<-onComplete
},
s...,
)
}
func enqueueByStatus(ctx context.Context, job *Job, signalEvents bool, enqueue func(IFile, context.Context), waitForComplete func(), s ...status.GetStatus) {
if job.Count(s...) == 0 {
return
}
jobCtx := job.WithContext(ctx)
count := 0
if signalEvents {
job.ClearCalled()
job.Start(false)
job.Scan()
}
files := job.Sub(s...).Statuses
var types []string
for _, st := range s {
types = append(types, st.Status().String())
}
job.Logger.Printf("retrying %v files (%v)", strings.Join(types, ", "), len(files))
for _, file := range files {
if job.FilesManager.WaitWithContext(jobCtx) {
count += 1
go enqueue(file, jobCtx)
}
}
if signalEvents {
job.EndScan()
}
for range iter.N(count) {
waitForComplete()
}
if signalEvents {
job.Finish()
}
}