/
client.go
134 lines (120 loc) · 2.64 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package webseed
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
)
type RequestSpec = segments.Extent
type requestPartResult struct {
resp *http.Response
err error
}
type requestPart struct {
req *http.Request
e segments.Extent
result chan requestPartResult
}
type Request struct {
cancel func()
Result chan RequestResult
}
func (r Request) Cancel() {
r.cancel()
}
type Client struct {
HttpClient *http.Client
Url string
FileIndex segments.Index
Info *metainfo.Info
}
type RequestResult struct {
Bytes []byte
Err error
}
func (ws *Client) NewRequest(r RequestSpec) Request {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length)
if err != nil {
panic(err)
}
req = req.WithContext(ctx)
part := requestPart{
req: req,
result: make(chan requestPartResult, 1),
e: e,
}
go func() {
resp, err := ws.HttpClient.Do(req)
part.result <- requestPartResult{
resp: resp,
err: err,
}
}()
requestParts = append(requestParts, part)
return true
}) {
panic("request out of file bounds")
}
req := Request{
cancel: cancel,
Result: make(chan RequestResult, 1),
}
go func() {
b, err := readRequestPartResponses(requestParts)
req.Result <- RequestResult{
Bytes: b,
Err: err,
}
}()
return req
}
type ErrBadResponse struct {
Msg string
Response *http.Response
}
func (me ErrBadResponse) Error() string {
return me.Msg
}
func recvPartResult(buf io.Writer, part requestPart) error {
result := <-part.result
if result.err != nil {
return result.err
}
defer result.resp.Body.Close()
switch result.resp.StatusCode {
case http.StatusPartialContent:
case http.StatusOK:
if part.e.Start != 0 {
return ErrBadResponse{"got status ok but request was at offset", result.resp}
}
default:
return ErrBadResponse{
fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode),
result.resp,
}
}
copied, err := io.Copy(buf, result.resp.Body)
if err != nil {
return err
}
if copied != part.e.Length {
return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
}
return nil
}
func readRequestPartResponses(parts []requestPart) ([]byte, error) {
var buf bytes.Buffer
for _, part := range parts {
err := recvPartResult(&buf, part)
if err != nil {
return buf.Bytes(), fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err)
}
}
return buf.Bytes(), nil
}