diff --git a/.config.toml.swp b/.config.toml.swp new file mode 100644 index 000000000..d9ee6f3c0 Binary files /dev/null and b/.config.toml.swp differ diff --git a/.docker-compose.yml.swp b/.docker-compose.yml.swp new file mode 100644 index 000000000..1f3644792 Binary files /dev/null and b/.docker-compose.yml.swp differ diff --git a/chain/indexer/distributed/catalog.go b/chain/indexer/distributed/catalog.go index 0bb833301..ac5748357 100644 --- a/chain/indexer/distributed/catalog.go +++ b/chain/indexer/distributed/catalog.go @@ -1,18 +1,12 @@ package distributed import ( - "context" - "encoding/json" "fmt" "os" - logging "github.com/ipfs/go-log/v2" - "go.uber.org/atomic" - "github.com/hibiken/asynq" - "go.opentelemetry.io/otel/trace" + logging "github.com/ipfs/go-log/v2" - "github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks" "github.com/filecoin-project/lily/config" ) @@ -30,7 +24,7 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) { if _, exists := c.servers[name]; exists { return nil, fmt.Errorf("duplicate queue name: %q", name) } - log.Infow("registering worker queue config", "name", name, "type", "redis") + log.Infow("registering worker queue config", "name", name, "type", "redis", "addr", sc.RedisConfig.Addr) // Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password. // TODO use github.com/kelseyhightower/envconfig @@ -42,26 +36,21 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) { } c.servers[name] = &TipSetWorker{ - server: asynq.NewServer( - asynq.RedisClientOpt{ - Network: sc.RedisConfig.Network, - Addr: sc.RedisConfig.Addr, - Username: sc.RedisConfig.Username, - Password: queuePassword, - DB: sc.RedisConfig.DB, - PoolSize: sc.RedisConfig.PoolSize, - }, - asynq.Config{ - LogLevel: sc.WorkerConfig.LogLevel(), - Queues: sc.WorkerConfig.Queues(), - ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout, - Concurrency: sc.WorkerConfig.Concurrency, - StrictPriority: sc.WorkerConfig.StrictPriority, - Logger: log.With("worker", name), - ErrorHandler: &QueueErrorHandler{}, - }, - ), - running: atomic.NewBool(false), + RedisConfig: asynq.RedisClientOpt{ + Network: sc.RedisConfig.Network, + Addr: sc.RedisConfig.Addr, + Username: sc.RedisConfig.Username, + Password: queuePassword, + DB: sc.RedisConfig.DB, + PoolSize: sc.RedisConfig.PoolSize, + }, + ServerConfig: asynq.Config{ + LogLevel: sc.WorkerConfig.LogLevel(), + Queues: sc.WorkerConfig.Queues(), + ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout, + Concurrency: sc.WorkerConfig.Concurrency, + StrictPriority: sc.WorkerConfig.StrictPriority, + }, } } @@ -69,7 +58,7 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) { if _, exists := c.servers[name]; exists { return nil, fmt.Errorf("duplicate queue name: %q", name) } - log.Infow("registering notifier queue config", "name", name, "type", "redis") + log.Infow("registering notifier queue config", "name", name, "type", "redis", "addr", cc.Addr) // Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password. // TODO use github.com/kelseyhightower/envconfig @@ -95,24 +84,8 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) { } type TipSetWorker struct { - server *asynq.Server - running *atomic.Bool -} - -func (w *TipSetWorker) Running() bool { - return w.running.Load() -} - -func (w *TipSetWorker) Run(mux *asynq.ServeMux) error { - if w.running.Load() { - return fmt.Errorf("server already running") - } - w.running.Swap(true) - return w.server.Run(mux) -} - -func (w *TipSetWorker) Shutdown() { - w.server.Shutdown() + RedisConfig asynq.RedisClientOpt + ServerConfig asynq.Config } // Catalog contains a map of workers and clients @@ -149,34 +122,3 @@ func (c *Catalog) Notifier(name string) (*asynq.Client, error) { } return client, nil } - -type QueueErrorHandler struct{} - -func (w *QueueErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) { - switch task.Type() { - case tasks.TypeIndexTipSet: - var p tasks.IndexTipSetPayload - if err := json.Unmarshal(task.Payload(), &p); err != nil { - log.Errorw("failed to decode task type (developer error?)", "error", err) - } - if p.HasTraceCarrier() { - if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { - ctx = trace.ContextWithRemoteSpanContext(ctx, sc) - trace.SpanFromContext(ctx).RecordError(err) - } - } - log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err) - case tasks.TypeGapFillTipSet: - var p tasks.GapFillTipSetPayload - if err := json.Unmarshal(task.Payload(), &p); err != nil { - log.Errorw("failed to decode task type (developer error?)", "error", err) - } - if p.HasTraceCarrier() { - if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { - ctx = trace.ContextWithRemoteSpanContext(ctx, sc) - trace.SpanFromContext(ctx).RecordError(err) - } - } - log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err) - } -} diff --git a/chain/indexer/distributed/queue/worker.go b/chain/indexer/distributed/queue/worker.go index 1b02fa485..87022dc8f 100644 --- a/chain/indexer/distributed/queue/worker.go +++ b/chain/indexer/distributed/queue/worker.go @@ -2,8 +2,11 @@ package queue import ( "context" + "encoding/json" "github.com/hibiken/asynq" + logging "github.com/ipfs/go-log/v2" + "go.opentelemetry.io/otel/trace" "github.com/filecoin-project/lily/chain/indexer" "github.com/filecoin-project/lily/chain/indexer/distributed" @@ -11,15 +14,20 @@ import ( "github.com/filecoin-project/lily/storage" ) +var log = logging.Logger("lily/distributed/worker") + type AsynqWorker struct { - done chan struct{} + done chan struct{} + + name string server *distributed.TipSetWorker index indexer.Indexer db *storage.Database } -func NewAsynqWorker(i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker { +func NewAsynqWorker(name string, i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker { return &AsynqWorker{ + name: name, server: server, index: i, db: db, @@ -34,18 +42,49 @@ func (t *AsynqWorker) Run(ctx context.Context) error { mux.HandleFunc(tasks.TypeIndexTipSet, tasks.NewIndexHandler(t.index).HandleIndexTipSetTask) mux.HandleFunc(tasks.TypeGapFillTipSet, tasks.NewGapFillHandler(t.index, t.db).HandleGapFillTipSetTask) - if err := t.server.Run(mux); err != nil { + t.server.ServerConfig.Logger = log.With("name", t.name) + t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{} + + server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig) + if err := server.Start(mux); err != nil { return err } - - go func() { - <-ctx.Done() - t.server.Shutdown() - }() - + <-ctx.Done() + server.Shutdown() return nil } func (t *AsynqWorker) Done() <-chan struct{} { return t.done } + +type WorkerErrorHandler struct{} + +func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) { + switch task.Type() { + case tasks.TypeIndexTipSet: + var p tasks.IndexTipSetPayload + if err := json.Unmarshal(task.Payload(), &p); err != nil { + log.Errorw("failed to decode task type (developer error?)", "error", err) + } + if p.HasTraceCarrier() { + if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { + ctx = trace.ContextWithRemoteSpanContext(ctx, sc) + trace.SpanFromContext(ctx).RecordError(err) + } + } + log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err) + case tasks.TypeGapFillTipSet: + var p tasks.GapFillTipSetPayload + if err := json.Unmarshal(task.Payload(), &p); err != nil { + log.Errorw("failed to decode task type (developer error?)", "error", err) + } + if p.HasTraceCarrier() { + if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { + ctx = trace.ContextWithRemoteSpanContext(ctx, sc) + trace.SpanFromContext(ctx).RecordError(err) + } + } + log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err) + } +} diff --git a/commands/job/worker.go b/commands/job/worker.go index 97052d355..24bf77a7d 100644 --- a/commands/job/worker.go +++ b/commands/job/worker.go @@ -11,20 +11,13 @@ import ( ) var tipsetWorkerFlags struct { - queue string - concurrency int + queue string } var TipSetWorkerCmd = &cli.Command{ Name: "tipset-worker", Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing", Flags: []cli.Flag{ - &cli.IntFlag{ - Name: "concurrency", - Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.", - Value: 1, - Destination: &tipsetWorkerFlags.concurrency, - }, &cli.StringFlag{ Name: "queue", Usage: "Name of queue system worker will consume work from.", @@ -43,9 +36,8 @@ var TipSetWorkerCmd = &cli.Command{ defer closer() res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{ - JobConfig: RunFlags.ParseJobConfig("tipset-worker"), - Queue: tipsetWorkerFlags.queue, - Concurrency: tipsetWorkerFlags.concurrency, + JobConfig: RunFlags.ParseJobConfig("tipset-worker"), + Queue: tipsetWorkerFlags.queue, }) if err != nil { return err diff --git a/go.mod b/go.mod index 686bac1e5..ea915b165 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,6 @@ require ( github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01 github.com/jedib0t/go-pretty/v6 v6.2.7 go.opentelemetry.io/otel/trace v1.3.0 - go.uber.org/atomic v1.9.0 ) require ( @@ -335,6 +334,7 @@ require ( github.com/zondax/ledger-go v0.12.1 // indirect go.opentelemetry.io/otel/metric v0.25.0 // indirect go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect + go.uber.org/atomic v1.9.0 // indirect go.uber.org/dig v1.12.0 // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect diff --git a/lens/lily/api.go b/lens/lily/api.go index 023a6e226..fc1f2094e 100644 --- a/lens/lily/api.go +++ b/lens/lily/api.go @@ -143,10 +143,6 @@ type LilyTipSetWorkerConfig struct { // Queue is the name of the queueing system the worker will consume work from. Queue string - // Concurrency sets the maximum number of concurrent processing of tasks. - // If set to a zero or negative value, NewServer will overwrite the value - // to the number of CPUs usable by the current process. - Concurrency int } type LilySurveyConfig struct { diff --git a/lens/lily/impl.go b/lens/lily/impl.go index ce6b29215..635071138 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -90,10 +90,6 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker return nil, err } - if worker.Running() { - return nil, fmt.Errorf("worker %s already running", cfg.Queue) - } - taskAPI, err := datasource.NewDataSource(m) if err != nil { return nil, err @@ -113,11 +109,10 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker Name: cfg.JobConfig.Name, Type: "tipset-worker", Params: map[string]string{ - "queue": cfg.Queue, - "storage": cfg.JobConfig.Storage, - "concurrency": strconv.Itoa(cfg.Concurrency), + "queue": cfg.Queue, + "storage": cfg.JobConfig.Storage, }, - Job: queue.NewAsynqWorker(im, db, worker), + Job: queue.NewAsynqWorker(cfg.JobConfig.Name, im, db, worker), RestartOnFailure: cfg.JobConfig.RestartOnFailure, RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, RestartDelay: cfg.JobConfig.RestartDelay, diff --git a/tasks/ipfs/task.go b/tasks/ipfs/task.go new file mode 100644 index 000000000..a0c59fc20 --- /dev/null +++ b/tasks/ipfs/task.go @@ -0,0 +1 @@ +package ipfs