Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
"github.com/filecoin-project/sentinel-visor/tasks/blocks"
"github.com/filecoin-project/sentinel-visor/tasks/chaineconomics"
"github.com/filecoin-project/sentinel-visor/tasks/messages"
"github.com/filecoin-project/sentinel-visor/tasks/msapprovals"
)
Expand Down Expand Up @@ -83,45 +86,45 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n
for _, task := range tasks {
switch task {
case BlocksTask:
tsi.processors[BlocksTask] = NewBlockProcessor()
tsi.processors[BlocksTask] = blocks.NewTask()
case MessagesTask:
tsi.messageProcessors[MessagesTask] = messages.NewTask(o)
case ChainEconomicsTask:
tsi.processors[ChainEconomicsTask] = NewChainEconomicsProcessor(o)
tsi.processors[ChainEconomicsTask] = chaineconomics.NewTask(o)
case ActorStatesRawTask:
tsi.actorProcessors[ActorStatesRawTask] = NewActorStateProcessor(o, &RawActorExtractorMap{})
tsi.actorProcessors[ActorStatesRawTask] = actorstate.NewTask(o, &actorstate.RawActorExtractorMap{})
case ActorStatesPowerTask:
tsi.actorProcessors[ActorStatesPowerTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
tsi.actorProcessors[ActorStatesPowerTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
CodeV1: sa0builtin.StoragePowerActorCodeID,
CodeV2: sa2builtin.StoragePowerActorCodeID,
CodeV3: sa3builtin.StoragePowerActorCodeID,
})
case ActorStatesRewardTask:
tsi.actorProcessors[ActorStatesRewardTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
tsi.actorProcessors[ActorStatesRewardTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
CodeV1: sa0builtin.RewardActorCodeID,
CodeV2: sa2builtin.RewardActorCodeID,
CodeV3: sa3builtin.RewardActorCodeID,
})
case ActorStatesMinerTask:
tsi.actorProcessors[ActorStatesMinerTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
tsi.actorProcessors[ActorStatesMinerTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
CodeV1: sa0builtin.StorageMinerActorCodeID,
CodeV2: sa2builtin.StorageMinerActorCodeID,
CodeV3: sa3builtin.StorageMinerActorCodeID,
})
case ActorStatesInitTask:
tsi.actorProcessors[ActorStatesInitTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
tsi.actorProcessors[ActorStatesInitTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
CodeV1: sa0builtin.InitActorCodeID,
CodeV2: sa2builtin.InitActorCodeID,
CodeV3: sa3builtin.InitActorCodeID,
})
case ActorStatesMarketTask:
tsi.actorProcessors[ActorStatesMarketTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
tsi.actorProcessors[ActorStatesMarketTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
CodeV1: sa0builtin.StorageMarketActorCodeID,
CodeV2: sa2builtin.StorageMarketActorCodeID,
CodeV3: sa3builtin.StorageMarketActorCodeID,
})
case ActorStatesMultisigTask:
tsi.actorProcessors[ActorStatesMultisigTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{
tsi.actorProcessors[ActorStatesMultisigTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{
CodeV1: sa0builtin.MultisigActorCodeID,
CodeV2: sa2builtin.MultisigActorCodeID,
CodeV3: sa3builtin.MultisigActorCodeID,
Expand Down
59 changes: 29 additions & 30 deletions chain/actor.go → tasks/actorstate/task.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package actorstate

import (
"context"
Expand All @@ -15,33 +15,32 @@ import (
"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
)

// An ActorStateProcessor processes the extraction of actor state according the allowed types in its extracter map.
type ActorStateProcessor struct {
// A Task processes the extraction of actor state according the allowed types in its extracter map.
type Task struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
extracterMap ActorExtractorMap
}

func NewActorStateProcessor(opener lens.APIOpener, extracterMap ActorExtractorMap) *ActorStateProcessor {
p := &ActorStateProcessor{
func NewTask(opener lens.APIOpener, extracterMap ActorExtractorMap) *Task {
p := &Task{
opener: opener,
extracterMap: extracterMap,
}
return p
}

func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
func (t *Task) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) {
if t.node == nil {
node, closer, err := t.opener.Open(ctx)
if err != nil {
return nil, nil, xerrors.Errorf("unable to open lens: %w", err)
}
p.node = node
p.closer = closer
t.node = node
t.closer = closer
}

log.Debugw("processing actor state changes", "height", ts.Height(), "parent_height", pts.Height())
Expand All @@ -57,7 +56,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
// Filter to just allowed actors
actors := map[string]types.Actor{}
for addr, act := range candidates {
if p.extracterMap.Allow(act.Code) {
if t.extracterMap.Allow(act.Code) {
actors[addr] = act
}
}
Expand All @@ -77,7 +76,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
// Run each task concurrently
results := make(chan *ActorStateResult, len(actors))
for addr, act := range actors {
go p.runActorStateExtraction(ctx, ts, pts, addr, act, results)
go t.runActorStateExtraction(ctx, ts, pts, addr, act, results)
}

// Gather results
Expand All @@ -86,13 +85,13 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
res := <-results
inFlight--
elapsed := time.Since(start)
lla := log.With("height", int64(ts.Height()), "actor", actorstate.ActorNameByCode(res.Code), "address", res.Address)
lla := log.With("height", int64(ts.Height()), "actor", ActorNameByCode(res.Code), "address", res.Address)

if res.Error != nil {
lla.Errorw("actor returned with error", "error", res.Error.Error())
report.ErrorsDetected = append(errorsDetected, &ActorStateError{
Code: res.Code.String(),
Name: actorstate.ActorNameByCode(res.Code),
Name: ActorNameByCode(res.Code),
Head: res.Head.String(),
Address: res.Address,
Error: res.Error.Error(),
Expand Down Expand Up @@ -120,8 +119,8 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
return data, report, nil
}

func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, actorstate.ActorNameByCode(act.Code)))
func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, ActorNameByCode(act.Code)))

res := &ActorStateResult{
Code: act.Code,
Expand All @@ -138,7 +137,7 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t
return
}

info := actorstate.ActorInfo{
info := ActorInfo{
Actor: act,
Address: addr,
ParentStateRoot: pts.ParentState(),
Expand All @@ -147,12 +146,12 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t
ParentTipSet: pts.Parents(),
}

extracter, ok := p.extracterMap.GetExtractor(act.Code)
extracter, ok := t.extracterMap.GetExtractor(act.Code)
if !ok {
res.SkippedParse = true
} else {
// Parse state
data, err := extracter.Extract(ctx, info, p.node)
data, err := extracter.Extract(ctx, info, t.node)
if err != nil {
res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err)
return
Expand All @@ -161,12 +160,12 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t
}
}

func (p *ActorStateProcessor) Close() error {
if p.closer != nil {
p.closer()
p.closer = nil
func (t *Task) Close() error {
if t.closer != nil {
t.closer()
t.closer = nil
}
p.node = nil
t.node = nil
return nil
}

Expand All @@ -190,7 +189,7 @@ type ActorStateError struct {
// An ActorExtractorMap controls which actor types may be extracted.
type ActorExtractorMap interface {
Allow(code cid.Cid) bool
GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool)
GetExtractor(code cid.Cid) (ActorStateExtractor, bool)
}

type ActorExtractorFilter interface {
Expand All @@ -204,8 +203,8 @@ func (RawActorExtractorMap) Allow(code cid.Cid) bool {
return true
}

func (RawActorExtractorMap) GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) {
return actorstate.ActorExtractor{}, true
func (RawActorExtractorMap) GetExtractor(code cid.Cid) (ActorStateExtractor, bool) {
return ActorExtractor{}, true
}

// A TypedActorExtractorMap extracts a single type of actor using full parsing of actor state
Expand All @@ -220,9 +219,9 @@ func (t *TypedActorExtractorMap) Allow(code cid.Cid) bool {
return code == t.CodeV1 || code == t.CodeV2 || code == t.CodeV3
}

func (t *TypedActorExtractorMap) GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) {
func (t *TypedActorExtractorMap) GetExtractor(code cid.Cid) (ActorStateExtractor, bool) {
if !t.Allow(code) {
return nil, false
}
return actorstate.GetActorStateExtractor(code)
return GetActorStateExtractor(code)
}
12 changes: 6 additions & 6 deletions chain/block.go → tasks/blocks/task.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package blocks

import (
"context"
Expand All @@ -10,14 +10,14 @@ import (
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
)

type BlockProcessor struct {
type Task struct {
}

func NewBlockProcessor() *BlockProcessor {
return &BlockProcessor{}
func NewTask() *Task {
return &Task{}
}

func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
var pl model.PersistableList
for _, bh := range ts.Blocks() {
select {
Expand All @@ -39,6 +39,6 @@ func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (m
return pl, report, nil
}

func (p *BlockProcessor) Close() error {
func (p *Task) Close() error {
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package chaineconomics

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package chaineconomics

import (
"context"
Expand Down
18 changes: 10 additions & 8 deletions chain/economics.go → tasks/chaineconomics/task.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
package chain
package chaineconomics

import (
"context"

"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
"github.com/filecoin-project/sentinel-visor/tasks/chain"
)

type ChainEconomicsProcessor struct {
var log = logging.Logger("chaineconomics")

type Task struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
}

func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor {
return &ChainEconomicsProcessor{
func NewTask(opener lens.APIOpener) *Task {
return &Task{
opener: opener,
}
}

func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
Expand All @@ -40,7 +42,7 @@ func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.T
StateRoot: ts.ParentState().String(),
}

ce, err := chain.ExtractChainEconomicsModel(ctx, p.node, ts)
ce, err := ExtractChainEconomicsModel(ctx, p.node, ts)
if err != nil {
log.Errorw("error received while extracting chain economics, closing lens", "error", err)
if cerr := p.Close(); cerr != nil {
Expand All @@ -52,7 +54,7 @@ func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.T
return ce, report, nil
}

func (p *ChainEconomicsProcessor) Close() error {
func (p *Task) Close() error {
if p.closer != nil {
p.closer()
p.closer = nil
Expand Down