forked from ipfs/go-graphsync
/
messages.go
139 lines (117 loc) · 3.32 KB
/
messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package requestmanager
import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p/core/peer"
"go.opentelemetry.io/otel/trace"
graphsync "github.com/filecoin-project/boost-graphsync"
gsmsg "github.com/filecoin-project/boost-graphsync/message"
"github.com/filecoin-project/boost-graphsync/peerstate"
"github.com/filecoin-project/boost-graphsync/requestmanager/executor"
)
type updateRequestMessage struct {
id graphsync.RequestID
extensions []graphsync.ExtensionData
response chan<- error
}
func (urm *updateRequestMessage) handle(rm *RequestManager) {
err := rm.update(urm.id, urm.extensions)
select {
case <-rm.ctx.Done():
case urm.response <- err:
}
}
type pauseRequestMessage struct {
id graphsync.RequestID
response chan<- error
}
func (prm *pauseRequestMessage) handle(rm *RequestManager) {
err := rm.pause(prm.id)
select {
case <-rm.ctx.Done():
case prm.response <- err:
}
}
type unpauseRequestMessage struct {
id graphsync.RequestID
extensions []graphsync.ExtensionData
response chan<- error
}
func (urm *unpauseRequestMessage) handle(rm *RequestManager) {
err := rm.unpause(urm.id, urm.extensions)
select {
case <-rm.ctx.Done():
case urm.response <- err:
}
}
type processResponsesMessage struct {
p peer.ID
responses []gsmsg.GraphSyncResponse
blks []blocks.Block
}
func (prm *processResponsesMessage) handle(rm *RequestManager) {
rm.processResponses(prm.p, prm.responses, prm.blks)
}
type cancelRequestMessage struct {
requestID graphsync.RequestID
onTerminated chan error
terminalError error
}
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
rm.cancelRequest(crm.requestID, crm.onTerminated, crm.terminalError)
}
type getRequestTaskMessage struct {
p peer.ID
task *peertask.Task
requestExecutionChan chan executor.RequestTask
}
func (irm *getRequestTaskMessage) handle(rm *RequestManager) {
requestExecution := rm.getRequestTask(irm.p, irm.task)
select {
case <-rm.ctx.Done():
case irm.requestExecutionChan <- requestExecution:
}
}
type releaseRequestTaskMessage struct {
p peer.ID
task *peertask.Task
err error
done chan struct{}
}
func (trm *releaseRequestTaskMessage) handle(rm *RequestManager) {
rm.releaseRequestTask(trm.p, trm.task, trm.err)
select {
case <-rm.ctx.Done():
case trm.done <- struct{}{}:
}
}
type newRequestMessage struct {
requestID graphsync.RequestID
span trace.Span
p peer.ID
root ipld.Link
selector ipld.Node
extensions []graphsync.ExtensionData
inProgressRequestChan chan<- inProgressRequest
}
func (nrm *newRequestMessage) handle(rm *RequestManager) {
var ipr inProgressRequest
ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.requestID, nrm.span, nrm.p, nrm.root, nrm.selector, nrm.extensions)
ipr.requestID = ipr.request.ID()
select {
case nrm.inProgressRequestChan <- ipr:
case <-rm.ctx.Done():
}
}
type peerStateMessage struct {
p peer.ID
peerStatsChan chan<- peerstate.PeerState
}
func (psm *peerStateMessage) handle(rm *RequestManager) {
peerStats := rm.peerStats(psm.p)
select {
case psm.peerStatsChan <- peerStats:
case <-rm.ctx.Done():
}
}