This repository has been archived by the owner on Feb 20, 2024. It is now read-only.
/
request.go
107 lines (89 loc) · 2.03 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package block
import (
"context"
"io"
"sync/atomic"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
// Request is a tuple of Request Request params.
type Request struct {
id, fld uint32
bs chan []blocks.Block
ids []cid.Cid
err error
done <-chan struct{}
cancel context.CancelFunc
}
// NewRequest creates new Request.
func NewRequest(ctx context.Context, id uint32, ids []cid.Cid) *Request {
ctx, cancel := context.WithCancel(ctx)
return &Request{id: id, bs: make(chan []blocks.Block, len(ids)/2), ids: ids, done: ctx.Done(), cancel: cancel}
}
// Id returns Request's id.
func (req *Request) Id() uint32 {
return req.id
}
// Fulfilled checks whenever Request is fully filled and finished.
func (req *Request) Fulfilled() bool {
return atomic.LoadUint32(&req.fld) == uint32(len(req.ids))
}
// Done returns done channel of underlying context.
func (req *Request) Done() <-chan struct{} {
return req.done
}
// Cancel finishes the Request.
func (req *Request) Cancel() {
req.cancel()
}
// Remains returns remaining ids for the Request to become fulfilled.
func (req *Request) Remains() []cid.Cid {
return req.ids[atomic.LoadUint32(&req.fld):]
}
// Next waits for new incoming blocks.
// Also returns false when Request is fulfilled.
func (req *Request) Next() ([]blocks.Block, error) {
select {
case bs := <-req.bs:
return bs, nil
case <-req.Done():
select {
case bs := <-req.bs:
return bs, nil
default:
if req.err != nil {
return nil, req.err
}
return nil, io.EOF
}
}
}
// Fill fills up the Request with asked blocks.
func (req *Request) Fill(bs []blocks.Block) bool {
if bs == nil {
return false
}
select {
case req.bs <- bs:
if atomic.AddUint32(&req.fld, uint32(len(bs))) == uint32(len(req.ids)) {
req.Cancel()
return false
}
return true
case <-req.Done():
return false
}
}
// Error cancels the request with an error.
func (req *Request) Error(err error) {
if err == nil {
return
}
select {
case <-req.Done():
return
default:
req.err = err
req.Cancel()
}
}