Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .config.toml.swp
Binary file not shown.
Binary file added .docker-compose.yml.swp
Binary file not shown.
98 changes: 20 additions & 78 deletions chain/indexer/distributed/catalog.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package distributed

import (
"context"
"encoding/json"
"fmt"
"os"

logging "github.com/ipfs/go-log/v2"
"go.uber.org/atomic"

"github.com/hibiken/asynq"
"go.opentelemetry.io/otel/trace"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
"github.com/filecoin-project/lily/config"
)

Expand All @@ -30,7 +24,7 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
if _, exists := c.servers[name]; exists {
return nil, fmt.Errorf("duplicate queue name: %q", name)
}
log.Infow("registering worker queue config", "name", name, "type", "redis")
log.Infow("registering worker queue config", "name", name, "type", "redis", "addr", sc.RedisConfig.Addr)

// Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password.
// TODO use github.com/kelseyhightower/envconfig
Expand All @@ -42,34 +36,29 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
}

c.servers[name] = &TipSetWorker{
server: asynq.NewServer(
asynq.RedisClientOpt{
Network: sc.RedisConfig.Network,
Addr: sc.RedisConfig.Addr,
Username: sc.RedisConfig.Username,
Password: queuePassword,
DB: sc.RedisConfig.DB,
PoolSize: sc.RedisConfig.PoolSize,
},
asynq.Config{
LogLevel: sc.WorkerConfig.LogLevel(),
Queues: sc.WorkerConfig.Queues(),
ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout,
Concurrency: sc.WorkerConfig.Concurrency,
StrictPriority: sc.WorkerConfig.StrictPriority,
Logger: log.With("worker", name),
ErrorHandler: &QueueErrorHandler{},
},
),
running: atomic.NewBool(false),
RedisConfig: asynq.RedisClientOpt{
Network: sc.RedisConfig.Network,
Addr: sc.RedisConfig.Addr,
Username: sc.RedisConfig.Username,
Password: queuePassword,
DB: sc.RedisConfig.DB,
PoolSize: sc.RedisConfig.PoolSize,
},
ServerConfig: asynq.Config{
LogLevel: sc.WorkerConfig.LogLevel(),
Queues: sc.WorkerConfig.Queues(),
ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout,
Concurrency: sc.WorkerConfig.Concurrency,
StrictPriority: sc.WorkerConfig.StrictPriority,
},
}
}

for name, cc := range cfg.Notifiers {
if _, exists := c.servers[name]; exists {
return nil, fmt.Errorf("duplicate queue name: %q", name)
}
log.Infow("registering notifier queue config", "name", name, "type", "redis")
log.Infow("registering notifier queue config", "name", name, "type", "redis", "addr", cc.Addr)

// Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password.
// TODO use github.com/kelseyhightower/envconfig
Expand All @@ -95,24 +84,8 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
}

type TipSetWorker struct {
server *asynq.Server
running *atomic.Bool
}

func (w *TipSetWorker) Running() bool {
return w.running.Load()
}

func (w *TipSetWorker) Run(mux *asynq.ServeMux) error {
if w.running.Load() {
return fmt.Errorf("server already running")
}
w.running.Swap(true)
return w.server.Run(mux)
}

func (w *TipSetWorker) Shutdown() {
w.server.Shutdown()
RedisConfig asynq.RedisClientOpt
ServerConfig asynq.Config
}

// Catalog contains a map of workers and clients
Expand Down Expand Up @@ -149,34 +122,3 @@ func (c *Catalog) Notifier(name string) (*asynq.Client, error) {
}
return client, nil
}

type QueueErrorHandler struct{}

func (w *QueueErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) {
switch task.Type() {
case tasks.TypeIndexTipSet:
var p tasks.IndexTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
case tasks.TypeGapFillTipSet:
var p tasks.GapFillTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
}
}
57 changes: 48 additions & 9 deletions chain/indexer/distributed/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ package queue

import (
"context"
"encoding/json"

"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel/trace"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
"github.com/filecoin-project/lily/storage"
)

var log = logging.Logger("lily/distributed/worker")

type AsynqWorker struct {
done chan struct{}
done chan struct{}

name string
server *distributed.TipSetWorker
index indexer.Indexer
db *storage.Database
}

func NewAsynqWorker(i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
func NewAsynqWorker(name string, i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
return &AsynqWorker{
name: name,
server: server,
index: i,
db: db,
Expand All @@ -34,18 +42,49 @@ func (t *AsynqWorker) Run(ctx context.Context) error {
mux.HandleFunc(tasks.TypeIndexTipSet, tasks.NewIndexHandler(t.index).HandleIndexTipSetTask)
mux.HandleFunc(tasks.TypeGapFillTipSet, tasks.NewGapFillHandler(t.index, t.db).HandleGapFillTipSetTask)

if err := t.server.Run(mux); err != nil {
t.server.ServerConfig.Logger = log.With("name", t.name)
t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{}

server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig)
if err := server.Start(mux); err != nil {
return err
}

go func() {
<-ctx.Done()
t.server.Shutdown()
}()

<-ctx.Done()
server.Shutdown()
return nil
}

func (t *AsynqWorker) Done() <-chan struct{} {
return t.done
}

type WorkerErrorHandler struct{}

func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) {
switch task.Type() {
case tasks.TypeIndexTipSet:
var p tasks.IndexTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
case tasks.TypeGapFillTipSet:
var p tasks.GapFillTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
}
}
14 changes: 3 additions & 11 deletions commands/job/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,13 @@ import (
)

var tipsetWorkerFlags struct {
queue string
concurrency int
queue string
}

var TipSetWorkerCmd = &cli.Command{
Name: "tipset-worker",
Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "concurrency",
Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.",
Value: 1,
Destination: &tipsetWorkerFlags.concurrency,
},
&cli.StringFlag{
Name: "queue",
Usage: "Name of queue system worker will consume work from.",
Expand All @@ -43,9 +36,8 @@ var TipSetWorkerCmd = &cli.Command{
defer closer()

res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{
JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
Queue: tipsetWorkerFlags.queue,
Concurrency: tipsetWorkerFlags.concurrency,
JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
Queue: tipsetWorkerFlags.queue,
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ require (
github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01
github.com/jedib0t/go-pretty/v6 v6.2.7
go.opentelemetry.io/otel/trace v1.3.0
go.uber.org/atomic v1.9.0
)

require (
Expand Down Expand Up @@ -335,6 +334,7 @@ require (
github.com/zondax/ledger-go v0.12.1 // indirect
go.opentelemetry.io/otel/metric v0.25.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/dig v1.12.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
Expand Down
4 changes: 0 additions & 4 deletions lens/lily/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ type LilyTipSetWorkerConfig struct {

// Queue is the name of the queueing system the worker will consume work from.
Queue string
// Concurrency sets the maximum number of concurrent processing of tasks.
// If set to a zero or negative value, NewServer will overwrite the value
// to the number of CPUs usable by the current process.
Concurrency int
}

type LilySurveyConfig struct {
Expand Down
11 changes: 3 additions & 8 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
return nil, err
}

if worker.Running() {
return nil, fmt.Errorf("worker %s already running", cfg.Queue)
}

taskAPI, err := datasource.NewDataSource(m)
if err != nil {
return nil, err
Expand All @@ -113,11 +109,10 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
Name: cfg.JobConfig.Name,
Type: "tipset-worker",
Params: map[string]string{
"queue": cfg.Queue,
"storage": cfg.JobConfig.Storage,
"concurrency": strconv.Itoa(cfg.Concurrency),
"queue": cfg.Queue,
"storage": cfg.JobConfig.Storage,
},
Job: queue.NewAsynqWorker(im, db, worker),
Job: queue.NewAsynqWorker(cfg.JobConfig.Name, im, db, worker),
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
Expand Down
1 change: 1 addition & 0 deletions tasks/ipfs/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ipfs