/
task.go
121 lines (106 loc) · 3.69 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package gas_output
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"github.com/filecoin-project/lily/chain/actors/builtin"
"github.com/filecoin-project/lily/model"
derivedmodel "github.com/filecoin-project/lily/model/derived"
visormodel "github.com/filecoin-project/lily/model/visor"
"github.com/filecoin-project/lily/tasks"
"github.com/filecoin-project/lily/tasks/messages"
)
type Task struct {
node tasks.DataSource
}
func NewTask(node tasks.DataSource) *Task {
return &Task{
node: node,
}
}
func (t *Task) ProcessTipSets(ctx context.Context, current *types.TipSet, executed *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
ctx, span := otel.Tracer("").Start(ctx, "ProcessTipSets")
if span.IsRecording() {
span.SetAttributes(
attribute.String("current", current.String()),
attribute.Int64("current_height", int64(current.Height())),
attribute.String("executed", executed.String()),
attribute.Int64("executed_height", int64(executed.Height())),
attribute.String("processor", "gas_outputs"),
)
}
defer span.End()
report := &visormodel.ProcessingReport{
Height: int64(current.Height()),
StateRoot: current.ParentState().String(),
}
tsMsgs, err := t.node.ExecutedAndBlockMessages(ctx, current, executed)
if err != nil {
report.ErrorsDetected = fmt.Errorf("getting executed and block messages: %w", err)
return nil, report, nil
}
emsgs := tsMsgs.Executed
var (
gasOutputsResults = make(derivedmodel.GasOutputsList, 0, len(emsgs))
errorsDetected = make([]*messages.MessageError, 0, len(emsgs))
exeMsgSeen = make(map[cid.Cid]bool, len(emsgs))
)
for _, m := range emsgs {
// Stop processing if we have been told to cancel
select {
case <-ctx.Done():
return nil, nil, fmt.Errorf("context done: %w", ctx.Err())
default:
}
if exeMsgSeen[m.Cid] {
continue
}
exeMsgSeen[m.Cid] = true
var msgSize int
if b, err := m.Message.Serialize(); err == nil {
msgSize = len(b)
} else {
errorsDetected = append(errorsDetected, &messages.MessageError{
Cid: m.Cid,
Error: fmt.Errorf("failed to serialize message: %w", err).Error(),
})
}
actorName := builtin.ActorNameByCode(m.ToActorCode)
gasOutput := &derivedmodel.GasOutputs{
Height: int64(m.Height),
Cid: m.Cid.String(),
From: m.Message.From.String(),
To: m.Message.To.String(),
Value: m.Message.Value.String(),
GasFeeCap: m.Message.GasFeeCap.String(),
GasPremium: m.Message.GasPremium.String(),
GasLimit: m.Message.GasLimit,
Nonce: m.Message.Nonce,
Method: uint64(m.Message.Method),
StateRoot: m.BlockHeader.ParentStateRoot.String(),
ExitCode: int64(m.Receipt.ExitCode),
GasUsed: m.Receipt.GasUsed,
ParentBaseFee: m.BlockHeader.ParentBaseFee.String(),
SizeBytes: msgSize,
BaseFeeBurn: m.GasOutputs.BaseFeeBurn.String(),
OverEstimationBurn: m.GasOutputs.OverEstimationBurn.String(),
MinerPenalty: m.GasOutputs.MinerPenalty.String(),
MinerTip: m.GasOutputs.MinerTip.String(),
Refund: m.GasOutputs.Refund.String(),
GasRefund: m.GasOutputs.GasRefund,
GasBurned: m.GasOutputs.GasBurned,
ActorName: actorName,
ActorFamily: builtin.ActorFamily(actorName),
}
gasOutputsResults = append(gasOutputsResults, gasOutput)
}
if len(errorsDetected) > 0 {
report.ErrorsDetected = errorsDetected
}
return model.PersistableList{
gasOutputsResults,
}, report, nil
}