go get github.com/ankorstore/yokai/fxworker
This module provides a workers pool to your Fx application with:
- automatic panic recovery
- automatic logging
- automatic metrics
- possibility to defer workers
- possibility to limit workers max execution attempts
This module is intended to be used alongside:
- the fxconfig module
- the fxlog module
- the fxtrace module
- the fxmetrics module
- the fxgenerate module
To load the module in your Fx application:
package main
import (
"context"
"github.com/ankorstore/yokai/fxconfig"
"github.com/ankorstore/yokai/fxgenerate"
"github.com/ankorstore/yokai/fxlog"
"github.com/ankorstore/yokai/fxmetrics"
"github.com/ankorstore/yokai/fxtrace"
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/worker"
"go.uber.org/fx"
)
func main() {
fx.New(
fxconfig.FxConfigModule, // load the module dependencies
fxlog.FxLogModule,
fxtrace.FxTraceModule,
fxtrace.FxTraceModule,
fxmetrics.FxMetricsModule,
fxgenerate.FxGenerateModule,
fxworker.FxWorkerModule, // load the module
fx.Invoke(func(pool *worker.WorkerPool) {
pool.Start(context.Background()) // start the workers pool
}),
).Run()
}
Configuration reference:
# ./configs/config.yaml
app:
name: app
env: dev
version: 0.1.0
debug: true
modules:
worker:
defer: 0.1 # threshold in seconds to wait before starting all workers, immediate start by default
attempts: 3 # max execution attempts in case of failures for all workers, no restart by default
metrics:
collect:
enabled: true # to collect metrics about workers executions
namespace: foo # workers metrics namespace (empty by default)
subsystem: bar # workers metrics subsystem (empty by default)
Notes:
- the workers logging will be based on the fxlog module configuration
- the workers tracing will be based on the fxtrace module configuration
This module provides the possibility to register several Worker implementations, with optional WorkerExecutionOption.
They will be then collected and given by Fx to the WorkerPool, made available in the Fx container.
This is done via the AsWorker()
function:
package main
import (
"context"
"github.com/ankorstore/yokai/fxconfig"
"github.com/ankorstore/yokai/fxgenerate"
"github.com/ankorstore/yokai/fxlog"
"github.com/ankorstore/yokai/fxmetrics"
"github.com/ankorstore/yokai/fxtrace"
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/worker"
"go.uber.org/fx"
)
type ExampleWorker struct{}
func NewExampleWorker() *ExampleWorker {
return &ExampleWorker{}
}
func (w *ExampleWorker) Name() string {
return "example-worker"
}
func (w *ExampleWorker) Run(ctx context.Context) error {
worker.CtxLogger(ctx).Info().Msg("run")
return nil
}
func main() {
fx.New(
fxconfig.FxConfigModule, // load the module dependencies
fxlog.FxLogModule,
fxtrace.FxTraceModule,
fxtrace.FxTraceModule,
fxmetrics.FxMetricsModule,
fxgenerate.FxGenerateModule,
fxworker.FxWorkerModule, // load the module
fx.Provide(
fxworker.AsWorker(
NewExampleWorker, // register the ExampleWorker
worker.WithDeferredStartThreshold(1), // with a deferred start threshold of 1 second
worker.WithMaxExecutionsAttempts(2), // and 2 max execution attempts
),
),
fx.Invoke(func(pool *worker.WorkerPool) {
pool.Start(context.Background()) // start the workers pool
}),
).Run()
}
To get more details about the features made available for your workers (contextual logging, tracing, etc.), check the worker module documentation.
By default, the worker.WorkerPool
is created by
the DefaultWorkerPoolFactory.
If needed, you can provide your own factory and override the module:
package main
import (
"context"
"github.com/ankorstore/yokai/fxconfig"
"github.com/ankorstore/yokai/fxgenerate"
"github.com/ankorstore/yokai/fxhealthcheck"
"github.com/ankorstore/yokai/fxlog"
"github.com/ankorstore/yokai/fxmetrics"
"github.com/ankorstore/yokai/fxtrace"
"github.com/ankorstore/yokai/fxworker"
"github.com/ankorstore/yokai/healthcheck"
"github.com/ankorstore/yokai/worker"
"go.uber.org/fx"
)
type CustomWorkerPoolFactory struct{}
func NewCustomWorkerPoolFactory() worker.WorkerPoolFactory {
return &CustomWorkerPoolFactory{}
}
func (f *CustomWorkerPoolFactory) Create(options ...worker.WorkerPoolOption) (*worker.WorkerPool, error) {
return &worker.WorkerPool{...}, nil
}
func main() {
fx.New(
fxconfig.FxConfigModule, // load the module dependencies
fxlog.FxLogModule,
fxtrace.FxTraceModule,
fxtrace.FxTraceModule,
fxmetrics.FxMetricsModule,
fxgenerate.FxGenerateModule,
fxworker.FxWorkerModule, // load the module
fx.Decorate(NewCustomWorkerPoolFactory), // override the module with a custom factory
fx.Invoke(func(pool *worker.WorkerPool) {
pool.Start(context.Background()) // start the custom worker pool
}),
).Run()
}