This repository has been archived by the owner on Feb 20, 2024. It is now read-only.
/
requester.go
140 lines (120 loc) · 3.14 KB
/
requester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package blockstream
import (
"context"
"errors"
"fmt"
"io"
"github.com/ipfs/go-block-format"
"github.com/Wondertan/go-blockstream/block"
)
// requester is responsible for requesting block from a remote peer.
// It has to be paired with a responder on the other side of a conversation.
type requester struct {
rwc io.ReadWriteCloser
new, cncl chan *block.Request
rq *block.RequestQueue
ctx context.Context
cancel context.CancelFunc
}
// newRequester creates new requester.
func newRequester(ctx context.Context, rwc io.ReadWriteCloser, reqs chan *block.Request, onErr сlose) *requester {
ctx, cancel := context.WithCancel(ctx)
rcv := &requester{
rwc: rwc,
new: reqs,
cncl: make(chan *block.Request),
rq: block.NewRequestQueue(ctx.Done()),
ctx: ctx,
cancel: cancel,
}
go onErr(rcv.writeLoop)
go onErr(rcv.readLoop)
return rcv
}
// writeLoop is a long running method which asynchronously handles requests, sends them to remote responder and queues up
// for future read by readLoop. It also handles request canceling, as well as request recovering in case stream is dead.
func (r *requester) writeLoop() error {
defer r.cancel()
for {
select {
case req := <-r.new:
err := writeBlocksReq(r.rwc, req.Id(), req.Remains())
if err != nil {
select {
case r.new <- req:
case <-req.Done():
case <-r.ctx.Done():
}
return fmt.Errorf("can't writeLoop request(%d): %w", req.Id(), err)
}
log.Debugf("[Requester] Sent request %d", req.Id())
go r.onCancel(req)
r.rq.Enqueue(req)
case req := <-r.cncl:
log.Debugf("[Requester] Request %d is cancelled", req.Id())
err := writeBlocksReq(r.rwc, req.Id(), nil)
if err != nil {
return fmt.Errorf("can't cancel request(%d): %w", req.Id(), err)
}
case <-r.ctx.Done():
return r.rwc.Close()
}
}
}
// readLoop is a long running method which receives requested blocks from the remote responder and fulfills queued request.
func (r *requester) readLoop() error {
for {
id, data, reqErr, err := readBlocksResp(r.rwc)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
req := r.rq.BackPopDone()
if req == nil {
_, err := r.rwc.Read([]byte{0})
if errors.Is(err, io.EOF) {
return nil
}
return err
}
if req.Id() != id {
log.Warnf("Received Block response for wrong request(%d), skipping...", id)
continue
}
if reqErr != nil {
req.Error(reqErr)
continue
}
ids := req.Remains()
bs := make([]blocks.Block, len(data))
for i, b := range data {
bs[i], err = newBlockCheckCid(b, ids[i])
if err != nil {
if errors.Is(err, blocks.ErrWrongHash) {
log.Errorf("%s: expected: %s, received: %s", err, ids[i], bs[i])
}
return err
}
}
log.Debugf("[Requester] Received blocks for request %d", req.Id())
if !req.Fill(bs) {
log.Debugf("[Requester] Request %d is fulfilled!", req.Id())
r.rq.PopBack()
}
}
}
// onCancel handles request cancellation.
func (r *requester) onCancel(req *block.Request) {
select {
case <-req.Done():
if !req.Fulfilled() {
select {
case r.cncl <- req:
case <-r.ctx.Done():
}
}
case <-r.ctx.Done():
}
}