-
Notifications
You must be signed in to change notification settings - Fork 59
/
dtutils.go
174 lines (158 loc) · 7.85 KB
/
dtutils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Package dtutils provides event listeners for the client and provider to
// listen for events on the data transfer module and dispatch FSM events based on them
package dtutils
import (
"fmt"
bstore "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/storeutil"
logging "github.com/ipfs/go-log/v2"
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
dtgs "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
)
var log = logging.Logger("storagemarket_impl")
// EventReceiver is any thing that can receive FSM events
type EventReceiver interface {
Send(id interface{}, name fsm.EventName, args ...interface{}) (err error)
}
// ProviderDataTransferSubscriber is the function called when an event occurs in a data
// transfer received by a provider -- it reads the voucher to verify this event occurred
// in a storage market deal, then, based on the data transfer event that occurred, it generates
// and update message for the deal -- either moving to staged for a completion
// event or moving to error if a data transfer error occurs
func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
node := channelState.Voucher()
if node.Voucher == nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID())
return
}
voucherIface, err := requestvalidation.BindnodeRegistry.TypeFromNode(node.Voucher, &requestvalidation.StorageDataTransferVoucher{})
// if this event is for a transfer not related to storage, ignore
if err != nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID())
return
}
voucher, _ := voucherIface.(*requestvalidation.StorageDataTransferVoucher) // safe to assume type
log.Debugw("processing storage provider dt event", "event", datatransfer.Events[event.Code], "proposalCid", voucher.Proposal, "channelID",
channelState.ChannelID(), "channelState", datatransfer.Statuses[channelState.Status()])
if channelState.Status() == datatransfer.Completed {
err := deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCompleted)
if err != nil {
log.Errorf("processing dt event: %s", err)
return
}
}
// Translate from data transfer events to provider FSM events
// Note: We ignore data transfer progress events (they do not affect deal state)
err = func() error {
switch event.Code {
case datatransfer.Cancel:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferCancelled)
case datatransfer.Restart:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferRestarted, channelState.ChannelID())
case datatransfer.Disconnected:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferStalled)
case datatransfer.Open:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferInitiated, channelState.ChannelID())
case datatransfer.Error:
return deals.Send(voucher.Proposal, storagemarket.ProviderEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
default:
return nil
}
}()
if err != nil {
log.Errorw("error processing storage provider dt event", "event", datatransfer.Events[event.Code], "proposalCid", voucher.Proposal, "channelID",
channelState.ChannelID(), "err", err)
}
}
}
// ClientDataTransferSubscriber is the function called when an event occurs in a data
// transfer initiated on the client -- it reads the voucher to verify this even occurred
// in a storage market deal, then, based on the data transfer event that occurred, it dispatches
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
// TODO: are these log messages valid for Client?
node := channelState.Voucher()
if node.Voucher == nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID())
return
}
voucherIface, err := requestvalidation.BindnodeRegistry.TypeFromNode(node.Voucher, &requestvalidation.StorageDataTransferVoucher{})
// if this event is for a transfer not related to storage, ignore
if err != nil {
log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID",
channelState.ChannelID())
return
}
voucher, _ := voucherIface.(*requestvalidation.StorageDataTransferVoucher) // safe to assume type
// Note: We ignore data transfer progress events (they do not affect deal state)
log.Debugw("processing storage client dt event", "event", datatransfer.Events[event.Code], "proposalCid", voucher.Proposal, "channelID",
channelState.ChannelID(), "channelState", datatransfer.Statuses[channelState.Status()])
if channelState.Status() == datatransfer.Completed {
err := deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferComplete)
if err != nil {
log.Errorf("processing dt event: %s", err)
return
}
}
err = func() error {
switch event.Code {
case datatransfer.Cancel:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferCancelled)
case datatransfer.Restart:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferRestarted, channelState.ChannelID())
case datatransfer.Disconnected:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferStalled)
case datatransfer.Accept:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferQueued, channelState.ChannelID())
case datatransfer.TransferInitiated:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferInitiated, channelState.ChannelID())
case datatransfer.Error:
return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message))
default:
return nil
}
}()
if err != nil {
log.Errorw("error processing storage client dt event", "event", datatransfer.Events[event.Code], "proposalCid", voucher.Proposal, "channelID",
channelState.ChannelID(), "err", err)
}
}
}
// StoreGetter retrieves the store for a given proposal cid
type StoreGetter interface {
Get(proposalCid cid.Cid) (bstore.Blockstore, error)
}
// TransportConfigurer configurers the graphsync transport to use a custom blockstore per deal
func TransportConfigurer(storeGetter StoreGetter) datatransfer.TransportConfigurer {
return func(channelID datatransfer.ChannelID, voucher datatransfer.TypedVoucher) []datatransfer.TransportOption {
if voucher.Voucher == nil {
log.Errorf("attempting to configure data store, empty voucher")
return nil
}
voucherIface, err := requestvalidation.BindnodeRegistry.TypeFromNode(voucher.Voucher, &requestvalidation.StorageDataTransferVoucher{})
// if this event is for a transfer not related to storage, ignore
if err != nil {
log.Errorf("attempting to configure data store, bad voucher: %s", err)
return nil
}
storageVoucher, _ := voucherIface.(*requestvalidation.StorageDataTransferVoucher) // safe to assume type
store, err := storeGetter.Get(storageVoucher.Proposal)
if err != nil {
log.Errorf("attempting to configure data store: %s", err)
return nil
}
if store == nil {
return nil
}
return []datatransfer.TransportOption{dtgs.UseStore(storeutil.LinkSystemForBlockstore(store))}
}
}