-
Notifications
You must be signed in to change notification settings - Fork 462
/
waiter.go
347 lines (308 loc) · 11.9 KB
/
waiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package chain
import (
"context"
"fmt"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/venus/pkg/constants"
"github.com/filecoin-project/venus/pkg/vm"
"github.com/filecoin-project/venus/venus-shared/types"
bstore "github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/pkg/errors"
)
type MsgLookup struct {
Message cid.Cid // Can be different than requested, in case it was replaced, but only gas values changed
Receipt types.MessageReceipt
ReturnDec interface{}
TipSet types.TipSetKey
Height abi.ChainEpoch
}
// Abstracts over a store of blockchain state.
type waiterChainReader interface {
GetHead() *types.TipSet
GetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
LookupID(context.Context, *types.TipSet, address.Address) (address.Address, error)
GetActorAt(context.Context, *types.TipSet, address.Address) (*types.Actor, error)
GetTipSetReceiptsRoot(context.Context, *types.TipSet) (cid.Cid, error)
SubHeadChanges(context.Context) chan []*types.HeadChange
}
type IStmgr interface {
GetActorAt(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
RunStateTransition(context.Context, *types.TipSet, vm.ExecCallBack, bool) (root cid.Cid, receipts cid.Cid, err error)
}
// Waiter waits for a message to appear on chain.
type Waiter struct {
chainReader waiterChainReader
messageProvider MessageProvider
cst cbor.IpldStore
bs bstore.Blockstore
Stmgr IStmgr
}
// WaitPredicate is a function that identifies a message and returns true when found.
type WaitPredicate func(msg *types.Message, msgCid cid.Cid) bool
// NewWaiter returns a new Waiter.
func NewWaiter(chainStore waiterChainReader, messages MessageProvider, bs bstore.Blockstore, cst cbor.IpldStore) *Waiter {
return &Waiter{
chainReader: chainStore,
cst: cst,
bs: bs,
messageProvider: messages,
}
}
// Find searches the blockchain history (but doesn't wait).
func (w *Waiter) Find(ctx context.Context, msg types.ChainMsg, lookback abi.ChainEpoch, ts *types.TipSet, allowReplaced bool) (*types.ChainMessage, bool, error) {
if ts == nil {
ts = w.chainReader.GetHead()
}
return w.findMessage(ctx, ts, msg, lookback, allowReplaced)
}
// WaitPredicate invokes the callback when the passed predicate succeeds.
// See api description.
//
// Note: this method does too much -- the callback should just receive the tipset
// containing the message and the caller should pull the receipt out of the block
// if in fact that's what it wants to do, using something like receiptFromTipset.
// Something like receiptFromTipset is necessary because not every message in
// a block will have a receipt in the tipset: it might be a duplicate message.
// This method will always check for the message in the current head tipset.
// A lookback parameter > 1 will cause this method to check for the message in
// up to that many previous tipsets on the chain of the current head.
func (w *Waiter) WaitPredicate(ctx context.Context, msg types.ChainMsg, confidence uint64, lookback abi.ChainEpoch, allowReplaced bool) (*types.ChainMessage, error) {
ch := w.chainReader.SubHeadChanges(ctx)
chainMsg, found, err := w.waitForMessage(ctx, ch, msg, confidence, lookback, allowReplaced)
if err != nil {
return nil, err
}
if found {
return chainMsg, nil
}
return nil, nil
}
// Wait uses WaitPredicate to invoke the callback when a message with the given cid appears on chain.
func (w *Waiter) Wait(ctx context.Context, msg types.ChainMsg, confidence uint64, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.ChainMessage, error) {
mid := msg.VMMessage().Cid()
log.Infof("Calling Waiter.Wait CID: %s", mid.String())
return w.WaitPredicate(ctx, msg, confidence, lookbackLimit, allowReplaced)
}
// findMessage looks for a matching in the chain and returns the message,
// block and receipt, when it is found. Returns the found message/block or nil
// if now block with the given CID exists in the chain.
// The lookback parameter is the number of tipsets in the past this method will check before giving up.
func (w *Waiter) findMessage(ctx context.Context, from *types.TipSet, m types.ChainMsg, lookback abi.ChainEpoch, allowReplaced bool) (*types.ChainMessage, bool, error) {
limitHeight := from.Height() - lookback
noLimit := lookback == constants.LookbackNoLimit
cur := from
curActor, err := w.Stmgr.GetActorAt(ctx, m.VMMessage().From, cur)
if err != nil {
return nil, false, fmt.Errorf("failed to load from actor")
}
mNonce := m.VMMessage().Nonce
for {
// If we've reached the genesis block, or we've reached the limit of
// how far back to look
if cur.Height() == 0 || !noLimit && cur.Height() <= limitHeight {
// it ain't here!
return nil, false, nil
}
select {
case <-ctx.Done():
return nil, false, nil
default:
}
// we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for,
// either way, no reason to lookback, it ain't there
if curActor == nil || curActor.Nonce == 0 || curActor.Nonce < mNonce {
return nil, false, nil
}
pts, err := w.chainReader.GetTipSet(ctx, cur.Parents())
if err != nil {
return nil, false, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
}
act, err := w.Stmgr.GetActorAt(ctx, m.VMMessage().From, pts)
actorNoExist := errors.Is(err, types.ErrActorNotFound)
if err != nil && !actorNoExist {
return nil, false, fmt.Errorf("failed to load the actor: %w", err)
}
// check that between cur and parent tipset the nonce fell into range of our message
if actorNoExist || (curActor.Nonce > mNonce && act.Nonce <= mNonce) {
msg, found, err := w.receiptForTipset(ctx, cur, m, allowReplaced)
if err != nil {
log.Errorf("Waiter.Wait: %s", err)
return nil, false, err
}
if found {
return msg, true, nil
}
}
cur = pts
curActor = act
}
}
// waitForMessage looks for a matching message in a channel of tipsets and returns
// the message, block and receipt, when it is found. Reads until the channel is
// closed or the context done. Returns the found message/block (or nil if the
// channel closed without finding it), whether it was found, or an error.
// notice matching mesage by message from and nonce. the return message may not be
// expected, because there maybe another message have the same from and nonce value
func (w *Waiter) waitForMessage(ctx context.Context, ch <-chan []*types.HeadChange, msg types.ChainMsg, confidence uint64, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.ChainMessage, bool, error) {
current, ok := <-ch
if !ok {
return nil, false, fmt.Errorf("SubHeadChanges stream was invalid")
}
// todo message wait
if len(current) != 1 {
return nil, false, fmt.Errorf("SubHeadChanges first entry should have been one item")
}
if current[0].Type != types.HCCurrent {
return nil, false, fmt.Errorf("expected current head on SHC stream (got %s)", current[0].Type)
}
currentHead := current[0].Val
chainMsg, found, err := w.receiptForTipset(ctx, currentHead, msg, allowReplaced)
if err != nil {
return nil, false, err
}
if found {
return chainMsg, found, nil
}
var backRcp *types.ChainMessage
backSearchWait := make(chan struct{})
go func() {
r, foundMsg, err := w.findMessage(ctx, currentHead, msg, lookbackLimit, allowReplaced)
if err != nil {
log.Warnf("failed to look back through chain for message: %w", err)
return
}
if foundMsg {
backRcp = r
close(backSearchWait)
}
}()
var candidateTS *types.TipSet
var candidateRcp *types.ChainMessage
heightOfHead := currentHead.Height()
reverts := map[string]bool{}
for {
select {
case notif, ok := <-ch:
if !ok {
return nil, false, err
}
for _, val := range notif {
switch val.Type {
case types.HCRevert:
if val.Val.Equals(candidateTS) {
candidateTS = nil
candidateRcp = nil
}
if backSearchWait != nil {
reverts[val.Val.Key().String()] = true
}
case types.HCApply:
if candidateTS != nil && val.Val.Height() >= candidateTS.Height()+abi.ChainEpoch(confidence) {
return candidateRcp, true, nil
}
r, foundMsg, err := w.receiptForTipset(ctx, val.Val, msg, allowReplaced)
if err != nil {
return nil, false, err
}
if r != nil {
if confidence == 0 {
return r, foundMsg, err
}
candidateTS = val.Val
candidateRcp = r
}
heightOfHead = val.Val.Height()
}
}
case <-backSearchWait:
// check if we found the message in the chain and that is hasn't been reverted since we started searching
if backRcp != nil && !reverts[backRcp.TS.Key().String()] {
// if head is at or past confidence interval, return immediately
if heightOfHead >= backRcp.TS.Height()+abi.ChainEpoch(confidence) {
return backRcp, true, nil
}
// wait for confidence interval
candidateTS = backRcp.TS
candidateRcp = backRcp
}
reverts = nil
backSearchWait = nil
case <-ctx.Done():
return nil, false, err
}
}
}
func (w *Waiter) receiptForTipset(ctx context.Context, ts *types.TipSet, msg types.ChainMsg, allowReplaced bool) (*types.ChainMessage, bool, error) {
// The genesis block
if ts.Height() == 0 {
return nil, false, nil
}
pts, err := w.chainReader.GetTipSet(ctx, ts.Parents())
if err != nil {
return nil, false, err
}
blockMessageInfos, err := w.messageProvider.LoadTipSetMessage(ctx, pts)
if err != nil {
return nil, false, err
}
expectedMsg := msg.VMMessage()
expectedCid := msg.Cid()
expectedNonce := msg.VMMessage().Nonce
expectedFrom := msg.VMMessage().From
for _, bms := range blockMessageInfos {
for _, msg := range append(bms.BlsMessages, bms.SecpkMessages...) {
msgCid := msg.Cid()
if msg.VMMessage().From == expectedFrom { // cheaper to just check origin first
if msg.VMMessage().Nonce == expectedNonce {
if !msg.VMMessage().EqualCall(expectedMsg) {
// this is an entirely different message, fail
return nil, false, fmt.Errorf("found message with equal nonce as the one we are looking for that is NOT a valid replacement message (F:%s n %d, TS: %s n%d)",
expectedMsg.Cid(), expectedMsg.Nonce, msg.Cid(), msg.VMMessage().Nonce)
}
if msgCid != expectedCid {
if !allowReplaced {
log.Warnw("found message with equal nonce and call params but different CID",
"wanted", expectedCid, "found", msgCid, "nonce", expectedNonce, "from", expectedFrom)
return nil, false, fmt.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)",
expectedCid, expectedNonce, msgCid, msg.VMMessage().Nonce)
}
}
recpt, err := w.receiptByIndex(ctx, pts, msgCid, blockMessageInfos)
if err != nil {
return nil, false, errors.Wrap(err, "error retrieving receipt from tipset")
}
return &types.ChainMessage{TS: ts, Message: msg, Block: bms.Block, Receipt: recpt}, true, nil
}
}
}
}
return nil, false, nil
}
func (w *Waiter) receiptByIndex(ctx context.Context, ts *types.TipSet, targetCid cid.Cid, blockMsgs []types.BlockMessagesInfo) (*types.MessageReceipt, error) {
var receiptCid cid.Cid
var err error
if _, receiptCid, err = w.Stmgr.RunStateTransition(ctx, ts, nil, false); err != nil {
return nil, fmt.Errorf("RunStateTransition failed:%w", err)
}
receipts, err := w.messageProvider.LoadReceipts(ctx, receiptCid)
if err != nil {
return nil, err
}
receiptIndex := 0
for _, blkInfo := range blockMsgs {
// todo aggrate bls and secp msg to one msg
for _, msg := range append(blkInfo.BlsMessages, blkInfo.SecpkMessages...) {
if msg.Cid().Equals(targetCid) {
if receiptIndex >= len(receipts) {
return nil, errors.Errorf("could not find message receipt at index %d", receiptIndex)
}
return &receipts[receiptIndex], nil
}
receiptIndex++
}
}
return nil, errors.Errorf("could not find message cid %s in dedupped messages", targetCid.String())
}