-
Notifications
You must be signed in to change notification settings - Fork 70
/
urldownloader.go
201 lines (181 loc) · 4.64 KB
/
urldownloader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package urldownloader
import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"sync/atomic"
"time"
"github.com/cenkalti/rain/internal/bufferpool"
"github.com/cenkalti/rain/internal/piece"
)
// URLDownloader downloads files from a HTTP source.
type URLDownloader struct {
URL string
Begin, End, current uint32
closeC, doneC chan struct{}
}
// PieceResult wraps the downloaded piece data.
type PieceResult struct {
Downloader *URLDownloader
Buffer bufferpool.Buffer
Index uint32
Error error
Done bool
}
// New returns a new URLDownloader for the given source and piece range.
func New(source string, begin, end uint32) *URLDownloader {
return &URLDownloader{
URL: source,
Begin: begin,
current: begin,
End: end,
closeC: make(chan struct{}),
doneC: make(chan struct{}),
}
}
// Close the URLDownloader.
func (d *URLDownloader) Close() {
close(d.closeC)
<-d.doneC
}
// String returns the URL being downloaded.
func (d *URLDownloader) String() string {
return d.URL
}
// UpdateEnd updates the end index of the piece range being downloaded.
func (d *URLDownloader) UpdateEnd(value uint32) {
atomic.StoreUint32(&d.End, value)
}
func (d *URLDownloader) readEnd() uint32 {
return atomic.LoadUint32(&d.End)
}
func (d *URLDownloader) incrCurrent() uint32 {
return atomic.AddUint32(&d.current, 1)
}
// ReadCurrent returns the index of piece that is currently being downloaded.
func (d *URLDownloader) ReadCurrent() uint32 {
return atomic.LoadUint32(&d.current)
}
// Run the URLDownloader and download pieces.
func (d *URLDownloader) Run(client *http.Client, pieces []piece.Piece, multifile bool, resultC chan interface{}, pool *bufferpool.Pool, readTimeout time.Duration) {
defer close(d.doneC)
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-d.doneC:
case <-d.closeC:
}
cancel()
}()
jobs := createJobs(pieces, d.Begin, d.readEnd())
var n int // position in piece
buf := pool.Get(int(pieces[d.current].Length))
processJob := func(job downloadJob) bool {
u := d.getURL(job.Filename, multifile)
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
panic(err)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", job.RangeBegin, job.RangeBegin+job.Length-1))
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
d.sendResult(resultC, &PieceResult{Downloader: d, Error: err})
return false
}
defer resp.Body.Close()
err = checkStatus(resp)
if err != nil {
d.sendResult(resultC, &PieceResult{Downloader: d, Error: err})
return false
}
timer := time.AfterFunc(readTimeout, cancel)
defer timer.Stop()
var m int64 // position in response
for m < job.Length {
readSize := calcReadSize(buf, n, job, m)
o, err := readFull(resp.Body, buf.Data[n:int64(n)+readSize], timer, readTimeout)
if err != nil {
d.sendResult(resultC, &PieceResult{Downloader: d, Error: err})
return false
}
n += o
m += int64(o)
if n == len(buf.Data) { // piece completed
index := d.current
done := d.current >= d.readEnd()-1
d.sendResult(resultC, &PieceResult{Downloader: d, Buffer: buf, Index: index, Done: done})
if done {
return true
}
d.incrCurrent()
// Allocate new buffer for next piece
n = 0
buf = pool.Get(int(pieces[d.current].Length))
}
}
return true
}
for _, job := range jobs {
ok := processJob(job)
if !ok {
buf.Release()
break
}
}
}
func calcReadSize(buf bufferpool.Buffer, bufPos int, job downloadJob, jobPos int64) int64 {
toPieceEnd := int64(len(buf.Data) - bufPos)
toResponseEnd := job.Length - jobPos
if toPieceEnd < toResponseEnd {
return toPieceEnd
}
return toResponseEnd
}
// readFull is similar to io.ReadFull call, plus it resets the read timer on each iteration.
func readFull(r io.Reader, b []byte, t *time.Timer, d time.Duration) (o int, err error) {
for o < len(b) && err == nil {
var nn int
nn, err = r.Read(b[o:])
o += nn
t.Reset(d)
}
if o >= len(b) {
err = nil
}
return
}
func (d *URLDownloader) getURL(filename string, multifile bool) string {
src := d.URL
if !multifile {
if src[len(src)-1] == '/' {
src += url.PathEscape(filename)
}
return src
}
if src[len(src)-1] != '/' {
src += "/"
}
return src + url.PathEscape(filename)
}
func (d *URLDownloader) sendResult(resultC chan interface{}, res *PieceResult) {
select {
case <-d.closeC:
return
default:
}
select {
case resultC <- res:
case <-d.closeC:
}
}
func checkStatus(resp *http.Response) error {
switch resp.StatusCode {
case 200, 206:
return nil
default:
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
}