Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 52 additions & 50 deletions tasks/actorstate/miner/sector_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/lily/model"
minermodel "github.com/filecoin-project/lily/model/actors/miner"
"github.com/filecoin-project/lily/tasks/actorstate"
"github.com/filecoin-project/lily/tasks/actorstate/miner/extraction"
)

type SectorEventsExtractor struct{}
Expand All @@ -25,27 +26,27 @@ func (SectorEventsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo
span.SetAttributes(a.Attributes()...)
}

ec, err := NewMinerStateExtractionContext(ctx, a, node)
extState, err := extraction.LoadMinerStates(ctx, a, node)
if err != nil {
return nil, fmt.Errorf("creating miner state extraction context: %w", err)
}

var sectorChanges *miner.SectorChanges
var preCommitChanges *miner.PreCommitChanges
if !ec.HasPreviousState() {
if extState.ParentState() == nil {
// If the miner doesn't have previous state list all of its current sectors and precommits
sectors, err := ec.CurrState.LoadSectors(nil)
sectors, err := extState.CurrentState().LoadSectors(nil)
if err != nil {
return nil, fmt.Errorf("loading miner sectors: %w", err)
}

sectorChanges = miner.MakeSectorChanges()
for i, sector := range sectors {
sectorChanges.Added[i] = *sector
for _, sector := range sectors {
sectorChanges.Added = append(sectorChanges.Added, *sector)
Comment on lines -43 to +45
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes a panic as we need to append to Added. This is a rare case when processing the genesis block.

}

preCommitChanges = miner.MakePreCommitChanges()
if err = ec.CurrState.ForEachPrecommittedSector(func(info miner.SectorPreCommitOnChainInfo) error {
if err = extState.CurrentState().ForEachPrecommittedSector(func(info miner.SectorPreCommitOnChainInfo) error {
preCommitChanges.Added = append(preCommitChanges.Added, info)
return nil
}); err != nil {
Expand All @@ -54,37 +55,42 @@ func (SectorEventsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo

} else {
// If the miner has previous state compute the list of new sectors and precommit in its current state.
preCommitChanges, err = node.DiffPreCommits(ctx, a.Address, a.Current, a.Executed, ec.PrevState, ec.CurrState)
preCommitChanges, err = node.DiffPreCommits(ctx, a.Address, a.Current, a.Executed, extState.ParentState(), extState.CurrentState())
if err != nil {
return nil, err
}

sectorChanges, err = node.DiffSectors(ctx, a.Address, a.Current, a.Executed, ec.PrevState, ec.CurrState)
sectorChanges, err = node.DiffSectors(ctx, a.Address, a.Current, a.Executed, extState.ParentState(), extState.CurrentState())
if err != nil {
return nil, err
}
}

sectorEventModel, err := extractSectorEvents(ctx, a, ec, sectorChanges, preCommitChanges)
dlDiff, err := miner.DiffDeadlines(extState.ParentState(), extState.CurrentState())
if err != nil {
return nil, err
}

sectorEventModel, err := ExtractSectorEvents(ctx, extState, sectorChanges, preCommitChanges, dlDiff)
if err != nil {
return nil, err
}

return sectorEventModel, nil
}

func extractSectorEvents(ctx context.Context, a actorstate.ActorInfo, ec *MinerStateExtractionContext, sc *miner.SectorChanges, pc *miner.PreCommitChanges) (minermodel.MinerSectorEventList, error) {
ctx, span := otel.Tracer("").Start(ctx, "extractMinerSectorEvents")
func ExtractSectorEvents(ctx context.Context, extState extraction.State, sc *miner.SectorChanges, pc *miner.PreCommitChanges, dlDiff miner.DeadlinesDiff) (minermodel.MinerSectorEventList, error) {
ctx, span := otel.Tracer("").Start(ctx, "ExtractSectorEvents")
defer span.End()

partitionEvents, err := extractMinerPartitionsDiff(ctx, a, ec)
partitionEvents, err := ExtractMinerPartitionsDiff(ctx, extState, dlDiff)
if err != nil {
return nil, fmt.Errorf("extracting miner partition diff: %w", err)
}

sectorEvents := extractMinerSectorEvents(a, sc)
sectorEvents := ExtractMinerSectorEvents(extState, sc)

preCommitEvents := extractMinerPreCommitEvents(a, pc)
preCommitEvents := ExtractMinerPreCommitEvents(extState, pc)

out := make(minermodel.MinerSectorEventList, 0, len(partitionEvents)+len(sectorEvents)+len(preCommitEvents))
out = append(out, partitionEvents...)
Expand All @@ -94,7 +100,7 @@ func extractSectorEvents(ctx context.Context, a actorstate.ActorInfo, ec *MinerS
return out, nil
}

func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChanges) minermodel.MinerSectorEventList {
func ExtractMinerSectorEvents(extState extraction.State, sectors *miner.SectorChanges) minermodel.MinerSectorEventList {
out := make(minermodel.MinerSectorEventList, 0, len(sectors.Added)+len(sectors.Extended)+len(sectors.Snapped))

// track sector add and commit-capacity add
Expand All @@ -104,9 +110,9 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
event = minermodel.CommitCapacityAdded
}
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(add.SectorNumber),
Event: event,
})
Expand All @@ -115,9 +121,9 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
// sector extension events
for _, mod := range sectors.Extended {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(mod.To.SectorNumber),
Event: minermodel.SectorExtended,
})
Expand All @@ -126,9 +132,9 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
// sector snapped events
for _, snap := range sectors.Snapped {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(snap.To.SectorNumber),
Event: minermodel.SectorSnapped,
})
Expand All @@ -137,14 +143,14 @@ func extractMinerSectorEvents(a actorstate.ActorInfo, sectors *miner.SectorChang
return out
}

func extractMinerPreCommitEvents(a actorstate.ActorInfo, preCommits *miner.PreCommitChanges) minermodel.MinerSectorEventList {
func ExtractMinerPreCommitEvents(extState extraction.State, preCommits *miner.PreCommitChanges) minermodel.MinerSectorEventList {
out := make(minermodel.MinerSectorEventList, len(preCommits.Added))
// track precommit addition
for i, add := range preCommits.Added {
out[i] = &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: uint64(add.Info.SectorNumber),
Event: minermodel.PreCommitAdded,
}
Expand All @@ -153,20 +159,15 @@ func extractMinerPreCommitEvents(a actorstate.ActorInfo, preCommits *miner.PreCo
return out
}

func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec *MinerStateExtractionContext) (minermodel.MinerSectorEventList, error) {
_, span := otel.Tracer("").Start(ctx, "extractMinerPartitionDiff") // nolint: ineffassign,staticcheck
func ExtractMinerPartitionsDiff(ctx context.Context, extState extraction.State, dlDiff miner.DeadlinesDiff) (minermodel.MinerSectorEventList, error) {
_, span := otel.Tracer("").Start(ctx, "ExtractMinerPartitionsDiff") // nolint: ineffassign,staticcheck
defer span.End()

// short circuit genesis state.
if !ec.HasPreviousState() {
if extState.ParentState() == nil {
return nil, nil
}

dlDiff, err := miner.DiffDeadlines(ec.PrevState, ec.CurrState)
if err != nil {
return nil, err
}

if dlDiff == nil {
return nil, nil
}
Expand All @@ -176,6 +177,7 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
recovered := bitfield.New()
recovering := bitfield.New()

var err error
for _, deadline := range dlDiff {
for _, partition := range deadline {
removed, err = bitfield.MergeBitFields(removed, partition.Removed)
Expand All @@ -198,7 +200,7 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
}
// build an index of removed sector expiration's for comparison below.

removedSectors, err := ec.CurrState.LoadSectors(&removed)
removedSectors, err := extState.CurrentState().LoadSectors(&removed)
if err != nil {
return nil, fmt.Errorf("fetching miners removed sectors: %w", err)
}
Expand All @@ -213,13 +215,13 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
if err := removed.ForEach(func(u uint64) error {
event := minermodel.SectorTerminated
expiration := rmExpireIndex[u]
if expiration == a.Current.Height() {
if expiration == extState.CurrentTipSet().Height() {
event = minermodel.SectorExpired
}
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: event,
})
Expand All @@ -231,9 +233,9 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
// track recovering sectors
if err := recovering.ForEach(func(u uint64) error {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: minermodel.SectorRecovering,
})
Expand All @@ -245,9 +247,9 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
// track faulted sectors
if err := faulted.ForEach(func(u uint64) error {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: minermodel.SectorFaulted,
})
Expand All @@ -259,9 +261,9 @@ func extractMinerPartitionsDiff(ctx context.Context, a actorstate.ActorInfo, ec
// track recovered sectors
if err := recovered.ForEach(func(u uint64) error {
out = append(out, &minermodel.MinerSectorEvent{
Height: int64(a.Current.Height()),
MinerID: a.Address.String(),
StateRoot: a.Current.ParentState().String(),
Height: int64(extState.CurrentTipSet().Height()),
MinerID: extState.Address().String(),
StateRoot: extState.CurrentTipSet().ParentState().String(),
SectorID: u,
Event: minermodel.SectorRecovered,
})
Expand Down
Loading