diff --git a/chain/indexer.go b/chain/indexer.go index bb193ac79..1bf9bc9af 100644 --- a/chain/indexer.go +++ b/chain/indexer.go @@ -18,6 +18,9 @@ import ( "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" visormodel "github.com/filecoin-project/sentinel-visor/model/visor" + "github.com/filecoin-project/sentinel-visor/tasks/actorstate" + "github.com/filecoin-project/sentinel-visor/tasks/blocks" + "github.com/filecoin-project/sentinel-visor/tasks/chaineconomics" "github.com/filecoin-project/sentinel-visor/tasks/messages" "github.com/filecoin-project/sentinel-visor/tasks/msapprovals" ) @@ -83,45 +86,45 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n for _, task := range tasks { switch task { case BlocksTask: - tsi.processors[BlocksTask] = NewBlockProcessor() + tsi.processors[BlocksTask] = blocks.NewTask() case MessagesTask: tsi.messageProcessors[MessagesTask] = messages.NewTask(o) case ChainEconomicsTask: - tsi.processors[ChainEconomicsTask] = NewChainEconomicsProcessor(o) + tsi.processors[ChainEconomicsTask] = chaineconomics.NewTask(o) case ActorStatesRawTask: - tsi.actorProcessors[ActorStatesRawTask] = NewActorStateProcessor(o, &RawActorExtractorMap{}) + tsi.actorProcessors[ActorStatesRawTask] = actorstate.NewTask(o, &actorstate.RawActorExtractorMap{}) case ActorStatesPowerTask: - tsi.actorProcessors[ActorStatesPowerTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{ + tsi.actorProcessors[ActorStatesPowerTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{ CodeV1: sa0builtin.StoragePowerActorCodeID, CodeV2: sa2builtin.StoragePowerActorCodeID, CodeV3: sa3builtin.StoragePowerActorCodeID, }) case ActorStatesRewardTask: - tsi.actorProcessors[ActorStatesRewardTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{ + tsi.actorProcessors[ActorStatesRewardTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{ CodeV1: sa0builtin.RewardActorCodeID, CodeV2: sa2builtin.RewardActorCodeID, CodeV3: sa3builtin.RewardActorCodeID, }) case ActorStatesMinerTask: - tsi.actorProcessors[ActorStatesMinerTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{ + tsi.actorProcessors[ActorStatesMinerTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{ CodeV1: sa0builtin.StorageMinerActorCodeID, CodeV2: sa2builtin.StorageMinerActorCodeID, CodeV3: sa3builtin.StorageMinerActorCodeID, }) case ActorStatesInitTask: - tsi.actorProcessors[ActorStatesInitTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{ + tsi.actorProcessors[ActorStatesInitTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{ CodeV1: sa0builtin.InitActorCodeID, CodeV2: sa2builtin.InitActorCodeID, CodeV3: sa3builtin.InitActorCodeID, }) case ActorStatesMarketTask: - tsi.actorProcessors[ActorStatesMarketTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{ + tsi.actorProcessors[ActorStatesMarketTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{ CodeV1: sa0builtin.StorageMarketActorCodeID, CodeV2: sa2builtin.StorageMarketActorCodeID, CodeV3: sa3builtin.StorageMarketActorCodeID, }) case ActorStatesMultisigTask: - tsi.actorProcessors[ActorStatesMultisigTask] = NewActorStateProcessor(o, &TypedActorExtractorMap{ + tsi.actorProcessors[ActorStatesMultisigTask] = actorstate.NewTask(o, &actorstate.TypedActorExtractorMap{ CodeV1: sa0builtin.MultisigActorCodeID, CodeV2: sa2builtin.MultisigActorCodeID, CodeV3: sa3builtin.MultisigActorCodeID, diff --git a/chain/actor.go b/tasks/actorstate/task.go similarity index 70% rename from chain/actor.go rename to tasks/actorstate/task.go index a44881c7d..446797334 100644 --- a/chain/actor.go +++ b/tasks/actorstate/task.go @@ -1,4 +1,4 @@ -package chain +package actorstate import ( "context" @@ -15,33 +15,32 @@ import ( "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model" visormodel "github.com/filecoin-project/sentinel-visor/model/visor" - "github.com/filecoin-project/sentinel-visor/tasks/actorstate" ) -// An ActorStateProcessor processes the extraction of actor state according the allowed types in its extracter map. -type ActorStateProcessor struct { +// A Task processes the extraction of actor state according the allowed types in its extracter map. +type Task struct { node lens.API opener lens.APIOpener closer lens.APICloser extracterMap ActorExtractorMap } -func NewActorStateProcessor(opener lens.APIOpener, extracterMap ActorExtractorMap) *ActorStateProcessor { - p := &ActorStateProcessor{ +func NewTask(opener lens.APIOpener, extracterMap ActorExtractorMap) *Task { + p := &Task{ opener: opener, extracterMap: extracterMap, } return p } -func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) { - if p.node == nil { - node, closer, err := p.opener.Open(ctx) +func (t *Task) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) { + if t.node == nil { + node, closer, err := t.opener.Open(ctx) if err != nil { return nil, nil, xerrors.Errorf("unable to open lens: %w", err) } - p.node = node - p.closer = closer + t.node = node + t.closer = closer } log.Debugw("processing actor state changes", "height", ts.Height(), "parent_height", pts.Height()) @@ -57,7 +56,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe // Filter to just allowed actors actors := map[string]types.Actor{} for addr, act := range candidates { - if p.extracterMap.Allow(act.Code) { + if t.extracterMap.Allow(act.Code) { actors[addr] = act } } @@ -77,7 +76,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe // Run each task concurrently results := make(chan *ActorStateResult, len(actors)) for addr, act := range actors { - go p.runActorStateExtraction(ctx, ts, pts, addr, act, results) + go t.runActorStateExtraction(ctx, ts, pts, addr, act, results) } // Gather results @@ -86,13 +85,13 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe res := <-results inFlight-- elapsed := time.Since(start) - lla := log.With("height", int64(ts.Height()), "actor", actorstate.ActorNameByCode(res.Code), "address", res.Address) + lla := log.With("height", int64(ts.Height()), "actor", ActorNameByCode(res.Code), "address", res.Address) if res.Error != nil { lla.Errorw("actor returned with error", "error", res.Error.Error()) report.ErrorsDetected = append(errorsDetected, &ActorStateError{ Code: res.Code.String(), - Name: actorstate.ActorNameByCode(res.Code), + Name: ActorNameByCode(res.Code), Head: res.Head.String(), Address: res.Address, Error: res.Error.Error(), @@ -120,8 +119,8 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe return data, report, nil } -func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) { - ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, actorstate.ActorNameByCode(act.Code))) +func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pts *types.TipSet, addrStr string, act types.Actor, results chan *ActorStateResult) { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ActorCode, ActorNameByCode(act.Code))) res := &ActorStateResult{ Code: act.Code, @@ -138,7 +137,7 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t return } - info := actorstate.ActorInfo{ + info := ActorInfo{ Actor: act, Address: addr, ParentStateRoot: pts.ParentState(), @@ -147,12 +146,12 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t ParentTipSet: pts.Parents(), } - extracter, ok := p.extracterMap.GetExtractor(act.Code) + extracter, ok := t.extracterMap.GetExtractor(act.Code) if !ok { res.SkippedParse = true } else { // Parse state - data, err := extracter.Extract(ctx, info, p.node) + data, err := extracter.Extract(ctx, info, t.node) if err != nil { res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err) return @@ -161,12 +160,12 @@ func (p *ActorStateProcessor) runActorStateExtraction(ctx context.Context, ts *t } } -func (p *ActorStateProcessor) Close() error { - if p.closer != nil { - p.closer() - p.closer = nil +func (t *Task) Close() error { + if t.closer != nil { + t.closer() + t.closer = nil } - p.node = nil + t.node = nil return nil } @@ -190,7 +189,7 @@ type ActorStateError struct { // An ActorExtractorMap controls which actor types may be extracted. type ActorExtractorMap interface { Allow(code cid.Cid) bool - GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) + GetExtractor(code cid.Cid) (ActorStateExtractor, bool) } type ActorExtractorFilter interface { @@ -204,8 +203,8 @@ func (RawActorExtractorMap) Allow(code cid.Cid) bool { return true } -func (RawActorExtractorMap) GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) { - return actorstate.ActorExtractor{}, true +func (RawActorExtractorMap) GetExtractor(code cid.Cid) (ActorStateExtractor, bool) { + return ActorExtractor{}, true } // A TypedActorExtractorMap extracts a single type of actor using full parsing of actor state @@ -220,9 +219,9 @@ func (t *TypedActorExtractorMap) Allow(code cid.Cid) bool { return code == t.CodeV1 || code == t.CodeV2 || code == t.CodeV3 } -func (t *TypedActorExtractorMap) GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool) { +func (t *TypedActorExtractorMap) GetExtractor(code cid.Cid) (ActorStateExtractor, bool) { if !t.Allow(code) { return nil, false } - return actorstate.GetActorStateExtractor(code) + return GetActorStateExtractor(code) } diff --git a/chain/block.go b/tasks/blocks/task.go similarity index 70% rename from chain/block.go rename to tasks/blocks/task.go index 047789a04..b482ab8a2 100644 --- a/chain/block.go +++ b/tasks/blocks/task.go @@ -1,4 +1,4 @@ -package chain +package blocks import ( "context" @@ -10,14 +10,14 @@ import ( visormodel "github.com/filecoin-project/sentinel-visor/model/visor" ) -type BlockProcessor struct { +type Task struct { } -func NewBlockProcessor() *BlockProcessor { - return &BlockProcessor{} +func NewTask() *Task { + return &Task{} } -func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { +func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { var pl model.PersistableList for _, bh := range ts.Blocks() { select { @@ -39,6 +39,6 @@ func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (m return pl, report, nil } -func (p *BlockProcessor) Close() error { +func (p *Task) Close() error { return nil } diff --git a/tasks/chain/economics.go b/tasks/chaineconomics/economics.go similarity index 98% rename from tasks/chain/economics.go rename to tasks/chaineconomics/economics.go index 6f19252d0..d7edb85d2 100644 --- a/tasks/chain/economics.go +++ b/tasks/chaineconomics/economics.go @@ -1,4 +1,4 @@ -package chain +package chaineconomics import ( "context" diff --git a/tasks/chain/economics_test.go b/tasks/chaineconomics/economics_test.go similarity index 98% rename from tasks/chain/economics_test.go rename to tasks/chaineconomics/economics_test.go index 6ac0a1a43..bcc6dadc4 100644 --- a/tasks/chain/economics_test.go +++ b/tasks/chaineconomics/economics_test.go @@ -1,4 +1,4 @@ -package chain +package chaineconomics import ( "context" diff --git a/chain/economics.go b/tasks/chaineconomics/task.go similarity index 68% rename from chain/economics.go rename to tasks/chaineconomics/task.go index f6a2f533d..9c2beb9f6 100644 --- a/chain/economics.go +++ b/tasks/chaineconomics/task.go @@ -1,30 +1,32 @@ -package chain +package chaineconomics import ( "context" "github.com/filecoin-project/lotus/chain/types" + logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "github.com/filecoin-project/sentinel-visor/lens" "github.com/filecoin-project/sentinel-visor/model" visormodel "github.com/filecoin-project/sentinel-visor/model/visor" - "github.com/filecoin-project/sentinel-visor/tasks/chain" ) -type ChainEconomicsProcessor struct { +var log = logging.Logger("chaineconomics") + +type Task struct { node lens.API opener lens.APIOpener closer lens.APICloser } -func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor { - return &ChainEconomicsProcessor{ +func NewTask(opener lens.APIOpener) *Task { + return &Task{ opener: opener, } } -func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { +func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { if p.node == nil { node, closer, err := p.opener.Open(ctx) if err != nil { @@ -40,7 +42,7 @@ func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.T StateRoot: ts.ParentState().String(), } - ce, err := chain.ExtractChainEconomicsModel(ctx, p.node, ts) + ce, err := ExtractChainEconomicsModel(ctx, p.node, ts) if err != nil { log.Errorw("error received while extracting chain economics, closing lens", "error", err) if cerr := p.Close(); cerr != nil { @@ -52,7 +54,7 @@ func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.T return ce, report, nil } -func (p *ChainEconomicsProcessor) Close() error { +func (p *Task) Close() error { if p.closer != nil { p.closer() p.closer = nil