/
statemanager.go
107 lines (91 loc) · 3.1 KB
/
statemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package modules
import (
"context"
"fmt"
"sync"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
var log = logging.Logger("lily/lens")
var ErrExecutionTraceNotFound = fmt.Errorf("failed to find execution trace")
func StateManager(_ helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, bs beacon.Schedule, em stmgr.ExecMonitor, metadataDs dtypes.MetadataDS, msgIndex index.MsgIndex) (*stmgr.StateManager, error) {
sm, err := stmgr.NewStateManagerWithUpgradeScheduleAndMonitor(cs, exec, sys, us, bs, em, metadataDs, msgIndex)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: sm.Start,
OnStop: sm.Stop,
})
return sm, nil
}
var _ stmgr.ExecMonitor = (*BufferedExecMonitor)(nil)
func NewBufferedExecMonitor() *BufferedExecMonitor {
// this only errors when a negative size is supplied...y u no accept unsigned ints :(
cache, err := lru.New(64)
if err != nil {
panic(err)
}
return &BufferedExecMonitor{
cache: cache,
}
}
type BufferedExecMonitor struct {
cacheMu sync.Mutex
cache *lru.Cache
}
type BufferedExecution struct {
TipSet *types.TipSet
Mcid cid.Cid
Msg *types.Message
Ret *vm.ApplyRet
Implicit bool
}
func (b *BufferedExecMonitor) MessageApplied(_ context.Context, ts *types.TipSet, mcid cid.Cid, msg *types.Message, ret *vm.ApplyRet, implicit bool) error {
execution := &BufferedExecution{
TipSet: ts,
Mcid: mcid,
Msg: msg,
Ret: ret,
Implicit: implicit,
}
b.cacheMu.Lock()
defer b.cacheMu.Unlock()
// if this is the first tipset we have seen a message applied for add it to the cache and bail.
found := b.cache.Contains(ts.Key())
if !found {
b.cache.Add(ts.Key(), []*BufferedExecution{execution})
return nil
}
// otherwise append to the current list of executions for this tipset.
v, _ := b.cache.Get(ts.Key())
exe := v.([]*BufferedExecution)
exe = append(exe, execution)
evicted := b.cache.Add(ts.Key(), exe)
// TODO it would be nice to know if we have extracted the buffered execution for this tipset already, maybe not important
if evicted {
log.Debugw("Evicting tipset from buffered exec monitor", "ts", ts.Key())
}
return nil
}
// So long as we are are always driving this method with tipsets we get from HeadEvents then we should always find a tipset in here.
func (b *BufferedExecMonitor) ExecutionFor(ts *types.TipSet) ([]*BufferedExecution, error) {
log.Debugw("execution for", "ts", ts.String())
b.cacheMu.Lock()
defer b.cacheMu.Unlock()
exe, found := b.cache.Get(ts.Key())
if !found {
return nil, ErrExecutionTraceNotFound
}
return exe.([]*BufferedExecution), nil
}