Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ actors-gen:

.PHONY: tasks-gen
tasks-gen:
go run ./chain/indexer/tablegen
go run ./chain/indexer/tasktype/tablegen
go fmt ./...

.PHONY: itest
Expand Down
5 changes: 1 addition & 4 deletions chain/tipset.go → chain/cache/tipset.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package chain
package cache

import (
"context"
"errors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)

var log = logging.Logger("lily/chain")

var (
ErrCacheEmpty = errors.New("cache empty")
ErrAddOutOfOrder = errors.New("added tipset height lower than current head")
Expand Down
2 changes: 1 addition & 1 deletion chain/tipset_test.go → chain/cache/tipset_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package cache

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion chain/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"gopkg.in/cheggaaa/pb.v1"
)

var log = logging.Logger("lily/chain")
var log = logging.Logger("lily/chain/export")

type ChainExporter struct {
store blockstore.Blockstore // blockstore chain is exported from
Expand Down
155 changes: 0 additions & 155 deletions chain/fill.go

This file was deleted.

98 changes: 98 additions & 0 deletions chain/gap/fill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package gap

import (
"context"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lily/chain/datasource"
"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/integrated"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/storage"
)

var log = logging.Logger("lily/chain/gap")

type Filler struct {
DB *storage.Database
node lens.API
name string
minHeight, maxHeight uint64
tasks []string
done chan struct{}
}

func NewFiller(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *Filler {
return &Filler{
DB: db,
node: node,
name: name,
maxHeight: maxHeight,
minHeight: minHeight,
tasks: tasks,
}
}

func (g *Filler) Run(ctx context.Context) error {
// init the done channel for each run since jobs may be started and stopped.
g.done = make(chan struct{})
defer close(g.done)

gaps, heights, err := g.DB.ConsolidateGaps(ctx, g.minHeight, g.maxHeight, g.tasks...)
if err != nil {
return err
}
fillStart := time.Now()
log.Infow("gap fill start", "start", fillStart.String(), "total_epoch_gaps", len(gaps), "from", g.minHeight, "to", g.maxHeight, "task", g.tasks, "reporter", g.name)

taskAPI, err := datasource.NewDataSource(g.node)
if err != nil {
return err
}

index, err := integrated.NewManager(taskAPI, g.DB, g.name)
if err != nil {
return err
}

for _, height := range heights {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
runStart := time.Now()

log.Infow("filling gap", "height", heights, "reporter", g.name)

ts, err := g.node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK)
if err != nil {
return err
}

log.Infof("got tipset for height %d, tipset height %d", heights, ts.Height())
if success, err := index.TipSet(ctx, ts, indexer.WithTasks(gaps[height])); err != nil {
log.Errorw("fill indexing encountered fatal error", "height", height, "tipset", ts.Key().String(), "error", err, "tasks", gaps[height], "reporter", g.name)
return err
} else if !success {
log.Errorw("fill indexing failed to successfully index tipset, skipping fill for tipset, gap remains", "height", height, "tipset", ts.Key().String(), "tasks", gaps[height], "reporter", g.name)
continue
}
log.Infow("fill success", "epoch", ts.Height(), "tasks_filled", gaps[height], "duration", time.Since(runStart), "reporter", g.name)

if err := g.DB.SetGapsFilled(ctx, height, gaps[height]...); err != nil {
return err
}
}
log.Infow("gap fill complete", "duration", time.Since(fillStart), "total_epoch_gaps", len(gaps), "from", g.minHeight, "to", g.maxHeight, "task", g.tasks, "reporter", g.name)

return nil
}

func (g *Filler) Done() <-chan struct{} {
return g.done
}
14 changes: 7 additions & 7 deletions chain/find.go → chain/gap/find.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package gap

import (
"context"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/filecoin-project/lily/storage"
)

type GapIndexer struct {
type Finder struct {
DB *storage.Database
node lens.API
name string
Expand All @@ -21,8 +21,8 @@ type GapIndexer struct {
done chan struct{}
}

func NewGapIndexer(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *GapIndexer {
return &GapIndexer{
func NewFinder(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *Finder {
return &Finder{
DB: db,
node: node,
name: name,
Expand All @@ -38,7 +38,7 @@ type TaskHeight struct {
Status string
}

func (g *GapIndexer) Find(ctx context.Context) (visor.GapReportList, error) {
func (g *Finder) Find(ctx context.Context) (visor.GapReportList, error) {
log.Debug("finding task epoch gaps")
start := time.Now()
var result []TaskHeight
Expand Down Expand Up @@ -80,7 +80,7 @@ SELECT * FROM gap_find(?,?,?,?,?);
return out, nil
}

func (g *GapIndexer) Run(ctx context.Context) error {
func (g *Finder) Run(ctx context.Context) error {
// init the done channel for each run since jobs may be started and stopped.
g.done = make(chan struct{})
defer close(g.done)
Expand All @@ -101,6 +101,6 @@ func (g *GapIndexer) Run(ctx context.Context) error {
return g.DB.PersistBatch(ctx, gaps)
}

func (g *GapIndexer) Done() <-chan struct{} {
func (g *Finder) Done() <-chan struct{} {
return g.done
}
Loading