forked from filecoin-project/go-fil-markets
-
Notifications
You must be signed in to change notification settings - Fork 2
/
dtutils.go
227 lines (197 loc) · 8.5 KB
/
dtutils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Package dtutils provides event listeners for the client and provider to
// listen for events on the data transfer module and dispatch FSM events based on them
package dtutils
import (
"fmt"
"math"
bstore "github.com/ipfs/boxo/blockstore"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/filecoin-project/boost-graphsync/storeutil"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-statemachine/fsm"
rm "github.com/filecoin-project/boost-gfm/retrievalmarket"
"github.com/filecoin-project/boost-gfm/retrievalmarket/migrations"
)
var log = logging.Logger("retrievalmarket_impl")
// EventReceiver is any thing that can receive FSM events
type EventReceiver interface {
Send(id interface{}, name fsm.EventName, args ...interface{}) (err error)
}
const noProviderEvent = rm.ProviderEvent(math.MaxUint64)
func providerEvent(event datatransfer.Event, channelState datatransfer.ChannelState) (rm.ProviderEvent, []interface{}) {
switch event.Code {
case datatransfer.Accept:
return rm.ProviderEventDealAccepted, []interface{}{channelState.ChannelID()}
case datatransfer.Disconnected:
return rm.ProviderEventDataTransferError, []interface{}{fmt.Errorf("deal data transfer stalled (peer hungup)")}
case datatransfer.Error:
return rm.ProviderEventDataTransferError, []interface{}{fmt.Errorf("deal data transfer failed: %s", event.Message)}
case datatransfer.Cancel:
return rm.ProviderEventClientCancelled, nil
default:
return noProviderEvent, nil
}
}
// ProviderDataTransferSubscriber is the function called when an event occurs in a data
// transfer received by a provider -- it reads the voucher to verify this event occurred
// in a storage market deal, then, based on the data transfer event that occurred, it generates
// and update message for the deal -- either moving to staged for a completion
// event or moving to error if a data transfer error occurs
func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
dealProposal, ok := dealProposalFromVoucher(channelState.Voucher())
// if this event is for a transfer not related to storage, ignore
if !ok {
return
}
if channelState.Status() == datatransfer.Completed {
err := deals.Send(rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}, rm.ProviderEventComplete)
if err != nil {
log.Errorf("processing dt event: %s", err)
}
}
retrievalEvent, params := providerEvent(event, channelState)
if retrievalEvent == noProviderEvent {
return
}
log.Debugw("processing retrieval provider dt event", "event", datatransfer.Events[event.Code], "dealID", dealProposal.ID, "peer", channelState.OtherPeer(), "retrievalEvent", rm.ProviderEvents[retrievalEvent])
err := deals.Send(rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()}, retrievalEvent, params...)
if err != nil {
log.Errorf("processing dt event: %s", err)
}
}
}
func clientEventForResponse(response *rm.DealResponse) (rm.ClientEvent, []interface{}) {
switch response.Status {
case rm.DealStatusRejected:
return rm.ClientEventDealRejected, []interface{}{response.Message}
case rm.DealStatusDealNotFound:
return rm.ClientEventDealNotFound, []interface{}{response.Message}
case rm.DealStatusAccepted:
return rm.ClientEventDealAccepted, nil
case rm.DealStatusFundsNeededUnseal:
return rm.ClientEventUnsealPaymentRequested, []interface{}{response.PaymentOwed}
case rm.DealStatusFundsNeededLastPayment:
return rm.ClientEventLastPaymentRequested, []interface{}{response.PaymentOwed}
case rm.DealStatusCompleted:
return rm.ClientEventComplete, nil
case rm.DealStatusFundsNeeded, rm.DealStatusOngoing:
return rm.ClientEventPaymentRequested, []interface{}{response.PaymentOwed}
default:
return rm.ClientEventUnknownResponseReceived, []interface{}{response.Status}
}
}
const noEvent = rm.ClientEvent(math.MaxUint64)
func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelState) (rm.ClientEvent, []interface{}) {
switch event.Code {
case datatransfer.DataReceivedProgress:
return rm.ClientEventBlocksReceived, []interface{}{channelState.Received()}
case datatransfer.FinishTransfer:
return rm.ClientEventAllBlocksReceived, nil
case datatransfer.Cancel:
return rm.ClientEventProviderCancelled, nil
case datatransfer.NewVoucherResult:
response, ok := dealResponseFromVoucherResult(channelState.LastVoucherResult())
if !ok {
log.Errorf("unexpected voucher result received: %s", channelState.LastVoucher().Type())
return noEvent, nil
}
return clientEventForResponse(response)
case datatransfer.Disconnected:
return rm.ClientEventDataTransferError, []interface{}{fmt.Errorf("deal data transfer stalled (peer hungup)")}
case datatransfer.Error:
if channelState.Message() == datatransfer.ErrRejected.Error() {
return rm.ClientEventDealRejected, []interface{}{"rejected for unknown reasons"}
}
return rm.ClientEventDataTransferError, []interface{}{fmt.Errorf("deal data transfer failed: %s", event.Message)}
default:
}
return noEvent, nil
}
// ClientDataTransferSubscriber is the function called when an event occurs in a data
// transfer initiated on the client -- it reads the voucher to verify this even occurred
// in a retrieval market deal, then, based on the data transfer event that occurred, it dispatches
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
dealProposal, ok := dealProposalFromVoucher(channelState.Voucher())
// if this event is for a transfer not related to retrieval, ignore
if !ok {
return
}
retrievalEvent, params := clientEvent(event, channelState)
if retrievalEvent == noEvent {
return
}
log.Debugw("processing retrieval client dt event", "event", datatransfer.Events[event.Code], "dealID", dealProposal.ID, "peer", channelState.OtherPeer(), "retrievalEvent", rm.ClientEvents[retrievalEvent])
// data transfer events for progress do not affect deal state
err := deals.Send(dealProposal.ID, retrievalEvent, params...)
if err != nil {
log.Errorf("processing dt event %s for state %s: %s",
datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], err)
}
}
}
// StoreGetter retrieves the store for a given id
type StoreGetter interface {
Get(otherPeer peer.ID, dealID rm.DealID) (bstore.Blockstore, error)
}
// StoreConfigurableTransport defines the methods needed to
// configure a data transfer transport use a unique store for a given request
type StoreConfigurableTransport interface {
UseStore(datatransfer.ChannelID, ipld.LinkSystem) error
}
// TransportConfigurer configurers the graphsync transport to use a custom blockstore per deal
func TransportConfigurer(thisPeer peer.ID, storeGetter StoreGetter) datatransfer.TransportConfigurer {
return func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) {
dealProposal, ok := dealProposalFromVoucher(voucher)
if !ok {
return
}
gsTransport, ok := transport.(StoreConfigurableTransport)
if !ok {
return
}
otherPeer := channelID.OtherParty(thisPeer)
store, err := storeGetter.Get(otherPeer, dealProposal.ID)
if err != nil {
log.Errorf("attempting to configure data store: %s", err)
return
}
if store == nil {
return
}
err = gsTransport.UseStore(channelID, storeutil.LinkSystemForBlockstore(store))
if err != nil {
log.Errorf("attempting to configure data store: %s", err)
}
}
}
func dealProposalFromVoucher(voucher datatransfer.Voucher) (*rm.DealProposal, bool) {
dealProposal, ok := voucher.(*rm.DealProposal)
// if this event is for a transfer not related to storage, ignore
if ok {
return dealProposal, true
}
legacyProposal, ok := voucher.(*migrations.DealProposal0)
if !ok {
return nil, false
}
newProposal := migrations.MigrateDealProposal0To1(*legacyProposal)
return &newProposal, true
}
func dealResponseFromVoucherResult(vres datatransfer.VoucherResult) (*rm.DealResponse, bool) {
dealResponse, ok := vres.(*rm.DealResponse)
// if this event is for a transfer not related to storage, ignore
if ok {
return dealResponse, true
}
legacyResponse, ok := vres.(*migrations.DealResponse0)
if !ok {
return nil, false
}
newResponse := migrations.MigrateDealResponse0To1(*legacyResponse)
return &newResponse, true
}