Skip to content

Latest commit

 

History

History

fxworker

Fx Worker Module

ci go report codecov Deps PkgGoDev

Fx module for worker.

Installation

go get github.com/ankorstore/yokai/fxworker

Features

This module provides a workers pool to your Fx application with:

  • automatic panic recovery
  • automatic logging
  • automatic metrics
  • possibility to defer workers
  • possibility to limit workers max execution attempts

Documentation

Dependencies

This module is intended to be used alongside:

Loading

To load the module in your Fx application:

package main

import (
  "context"

  "github.com/ankorstore/yokai/fxconfig"
  "github.com/ankorstore/yokai/fxgenerate"
  "github.com/ankorstore/yokai/fxlog"
  "github.com/ankorstore/yokai/fxmetrics"
  "github.com/ankorstore/yokai/fxtrace"
  "github.com/ankorstore/yokai/fxworker"
  "github.com/ankorstore/yokai/worker"
  "go.uber.org/fx"
)

func main() {
  fx.New(
    fxconfig.FxConfigModule,                    // load the module dependencies
    fxlog.FxLogModule,
    fxtrace.FxTraceModule,
    fxtrace.FxTraceModule,
    fxmetrics.FxMetricsModule,
    fxgenerate.FxGenerateModule,
    fxworker.FxWorkerModule,                    // load the module
    fx.Invoke(func(pool *worker.WorkerPool) {
      pool.Start(context.Background())          // start the workers pool
    }),
  ).Run()
}

Configuration

Configuration reference:

# ./configs/config.yaml
app:
  name: app
  env: dev
  version: 0.1.0
  debug: true
modules:
  worker:
    defer: 0.1             # threshold in seconds to wait before starting all workers, immediate start by default
    attempts: 3            # max execution attempts in case of failures for all workers, no restart by default
    metrics:
      collect:
        enabled: true      # to collect metrics about workers executions
        namespace: foo     # workers metrics namespace (empty by default)
        subsystem: bar     # workers metrics subsystem (empty by default)

Notes:

  • the workers logging will be based on the fxlog module configuration
  • the workers tracing will be based on the fxtrace module configuration

Registration

This module provides the possibility to register several Worker implementations, with optional WorkerExecutionOption.

They will be then collected and given by Fx to the WorkerPool, made available in the Fx container.

This is done via the AsWorker() function:

package main

import (
  "context"

  "github.com/ankorstore/yokai/fxconfig"
  "github.com/ankorstore/yokai/fxgenerate"
  "github.com/ankorstore/yokai/fxlog"
  "github.com/ankorstore/yokai/fxmetrics"
  "github.com/ankorstore/yokai/fxtrace"
  "github.com/ankorstore/yokai/fxworker"
  "github.com/ankorstore/yokai/worker"
  "go.uber.org/fx"
)

type ExampleWorker struct{}

func NewExampleWorker() *ExampleWorker {
  return &ExampleWorker{}
}

func (w *ExampleWorker) Name() string {
  return "example-worker"
}

func (w *ExampleWorker) Run(ctx context.Context) error {
  worker.CtxLogger(ctx).Info().Msg("run")

  return nil
}

func main() {
  fx.New(
    fxconfig.FxConfigModule,                      // load the module dependencies
    fxlog.FxLogModule,
    fxtrace.FxTraceModule,
    fxtrace.FxTraceModule,
    fxmetrics.FxMetricsModule,
    fxgenerate.FxGenerateModule,
    fxworker.FxWorkerModule,                      // load the module
    fx.Provide(
      fxworker.AsWorker(
        NewExampleWorker,                         // register the ExampleWorker
        worker.WithDeferredStartThreshold(1),     // with a deferred start threshold of 1 second
        worker.WithMaxExecutionsAttempts(2),      // and 2 max execution attempts
      ),
    ),
    fx.Invoke(func(pool *worker.WorkerPool) {
      pool.Start(context.Background())            // start the workers pool
    }),
  ).Run()
}

To get more details about the features made available for your workers (contextual logging, tracing, etc.), check the worker module documentation.

Override

By default, the worker.WorkerPool is created by the DefaultWorkerPoolFactory.

If needed, you can provide your own factory and override the module:

package main

import (
  "context"

  "github.com/ankorstore/yokai/fxconfig"
  "github.com/ankorstore/yokai/fxgenerate"
  "github.com/ankorstore/yokai/fxhealthcheck"
  "github.com/ankorstore/yokai/fxlog"
  "github.com/ankorstore/yokai/fxmetrics"
  "github.com/ankorstore/yokai/fxtrace"
  "github.com/ankorstore/yokai/fxworker"
  "github.com/ankorstore/yokai/healthcheck"
  "github.com/ankorstore/yokai/worker"
  "go.uber.org/fx"
)

type CustomWorkerPoolFactory struct{}

func NewCustomWorkerPoolFactory() worker.WorkerPoolFactory {
  return &CustomWorkerPoolFactory{}
}

func (f *CustomWorkerPoolFactory) Create(options ...worker.WorkerPoolOption) (*worker.WorkerPool, error) {
  return &worker.WorkerPool{...}, nil
}

func main() {
  fx.New(
    fxconfig.FxConfigModule,                     // load the module dependencies
    fxlog.FxLogModule,
    fxtrace.FxTraceModule,
    fxtrace.FxTraceModule,
    fxmetrics.FxMetricsModule,
    fxgenerate.FxGenerateModule,
    fxworker.FxWorkerModule,                     // load the module
    fx.Decorate(NewCustomWorkerPoolFactory),     // override the module with a custom factory
    fx.Invoke(func(pool *worker.WorkerPool) {
      pool.Start(context.Background())           // start the custom worker pool
    }),
  ).Run()
}