import "github.com/Bose/go-work"
- Constants
- Variables
- func InitTracing(serviceName string, tracingAgentHostPort string, opt ...Option) (tracer opentracing.Tracer, reporter jaeger.Reporter, closer io.Closer, err error)
- func RunE(fn func() error, opt ...Option) (err error)
- func StartSpan(operationName string) opentracing.Span
- func StartSpanWithParent(parent opentracing.SpanContext, operationName string) opentracing.Span
- func WrapChannel(chanToWrap interface{}) (<-chan interface{}, error)
- type Adapter
- func WithAggregateLogger(useBanner bool, timeFormat string, utc bool, logrusFieldNameForTraceID string, contextTraceIDField []byte, opt ...Option) Adapter
- func WithHealthCheck(health *HealthCheck, opt ...Option) Adapter
- func WithOpenTracing(operationPrefix []byte) Adapter
- func WithPrometheus(p *Prometheus) Adapter
- type Args
- type CommonWorker
- func NewCommonWorker(l Logger) *CommonWorker
- func NewCommonWorkerWithContext(ctx context.Context, l Logger) *CommonWorker
- func (w *CommonWorker) GetContext() context.Context
- func (w *CommonWorker) Perform(job *Job, opt ...Option) error
- func (w *CommonWorker) PerformAt(job *Job, t time.Time) error
- func (w *CommonWorker) PerformEvery(job *Job, interval time.Duration, opt ...Option) error
- func (w *CommonWorker) PerformIn(job *Job, d time.Duration) error
- func (w *CommonWorker) PerformReceive(job *Job, readChan interface{}, opt ...Option) error
- func (w *CommonWorker) Register(name string, h Handler) error
- func (w *CommonWorker) SetContext(ctx context.Context)
- func (w *CommonWorker) Start(ctx context.Context) error
- func (w *CommonWorker) Stop() error
- type ConcurrentJob
- func NewConcurrentJob(job Job, workerFactory WorkerFactory, performWith PerformWith, performEvery time.Duration, maxWorker int64, startInterval time.Duration, logger Logger, opt ...Option) (ConcurrentJob, error)
- func (j *ConcurrentJob) Register(name string, h Handler) error
- func (j *ConcurrentJob) RunningWorkers() int64
- func (j *ConcurrentJob) Start() error
- func (j *ConcurrentJob) Stop()
- type Context
- type Handler
- type HealthCheck
- type Job
- type Logger
- type Metric
- type Option
- func WithBanner(useBanner bool) Option
- func WithChannel(ch interface{}) Option
- func WithEngine(e *gin.Engine) Option
- func WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) Option
- func WithErrorPercentageByRequestCount(percentageOfErrors float64, minNumOfRequests, lastNumOfRequests int) Option
- func WithHealthHandler(h gin.HandlerFunc) Option
- func WithHealthPath(path string) Option
- func WithHealthTicker(ticker *time.Ticker) Option
- func WithIgnore(handlers ...string) Option
- func WithJob(j *Job) Option
- func WithMetricsPath(path string) Option
- func WithNamespace(ns string) Option
- func WithSampleProbability(sampleProbability float64) Option
- func WithSilentNoResponse(silent bool) Option
- func WithSubSystem(sub string) Option
- func WithSync(sync bool) Option
- type Options
- type PerformWith
- type Prometheus
- type Session
- type Status
- type Worker
- type WorkerFactory
channel.go concurrent_job.go context.go job.go logger.go metrics.go middleware.go middleware_aggregatelogger.go middleware_health.go middleware_opentracing.go middleware_prometheus.go options.go safe.go session.go span.go status.go worker.go worker_common.go
const (
// RequestSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the request approximate size in bytes
// WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt
RequestSize = "keyRequestSize"
// ResponseSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the response approximate size in bytes
// WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt
ResponseSize = "keyResponseSize"
)
const (
// StatusUnknown was a job with an unknown status
StatusUnknown Status = -1
// StatusSucces was a successful job
StatusSuccess = 200
// StatusBadRequest was a job with a bad request
StatusBadRequest = 400
// StatusForbidden was a forbidden job
StatusForbidden = 403
// StatusUnauthorized was an unauthorized job
StatusUnauthorized = 401
// StatusTimeout was a job that timed out
StatusTimeout = 408
// StatusNoResponse was a job that intentionally created no response (basically the conditions were met for a noop by the Job)
StatusNoResponse = 444
// StatusInternalError was a job with an internal error
StatusInternalError = 500
// StatusUnavailable was a job that was unavailable
StatusUnavailable = 503
)
const AggregateLogger = "aggregateLogger"
AggregateLogger defines the const string for getting the logger from a Job context
const DefaultHealthTickerDuration = 1 * time.Minute
DefaultHealthTickerDuration is the time duration between the recalculation of the status returned by HealthCheck.GetStatus()
var ContextTraceIDField string
ContextTraceIDField - used to find the trace id in the context - optional
func InitTracing(serviceName string, tracingAgentHostPort string, opt ...Option) (
tracer opentracing.Tracer,
reporter jaeger.Reporter,
closer io.Closer,
err error)
InitTracing will init opentracing with options WithSampleProbability defaults: constant sampling
func RunE(fn func() error, opt ...Option) (err error)
Run the function safely knowing that if it panics, the panic will be caught and returned as an error. it produces a stack trace and if WithJob(job), the job's status is set to StatusInternalError.
func StartSpan(operationName string) opentracing.Span
StartSpan will start a new span with no parent span.
func StartSpanWithParent(parent opentracing.SpanContext, operationName string) opentracing.Span
StartSpanWithParent will start a new span with a parent span. example:
span:= StartSpanWithParent(c.Get("tracing-context"),
func WrapChannel(chanToWrap interface{}) (<-chan interface{}, error)
WrapChanel takes a concrete receiving chan in as an interface{}, and wraps it with an interface{} chan so you can treat all receiving channels the same way
type Adapter func(Handler) Handler
Adapter defines the adaptor middleware type
func WithAggregateLogger(
useBanner bool,
timeFormat string,
utc bool,
logrusFieldNameForTraceID string,
contextTraceIDField []byte,
opt ...Option) Adapter
WithAggregateLogger is a middleware adapter for aggregated logging (see go-gin-logrus)
func WithHealthCheck(health *HealthCheck, opt ...Option) Adapter
WithHealthCheck is an adpater middleware for healthcheck. it also adds a health http GET endpoint. It supports the Option. WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) that allows you to override the default of: 1.0 (100%) errors in the last 5 min.
func WithOpenTracing(operationPrefix []byte) Adapter
WithOpenTracing is an adpater middleware that adds opentracing
func WithPrometheus(p *Prometheus) Adapter
Instrument is a gin middleware that can be used to generate metrics for a single handler
type Args map[string]interface{}
Args is how parameters are passed to jobs
type CommonWorker struct {
// Logger for the worker
Logger Logger
// contains filtered or unexported fields
}
CommonWorker defines the typical common worker
func NewCommonWorker(l Logger) *CommonWorker
NewCommonWorker creates a new CommonWorker
func NewCommonWorkerWithContext(ctx context.Context, l Logger) *CommonWorker
NewCommonWorkerWithContext creates a new CommonWorker
func (*CommonWorker) GetContext
func (w *CommonWorker) GetContext() context.Context
GetContext from the worker
func (w *CommonWorker) Perform(job *Job, opt ...Option) error
Perform executes the job. If WithSync(true) Option then it's a blocking call, the default is false (so async)
func (w *CommonWorker) PerformAt(job *Job, t time.Time) error
PerformAt performs a job at a particular time using a goroutine.
func (*CommonWorker) PerformEvery
func (w *CommonWorker) PerformEvery(job *Job, interval time.Duration, opt ...Option) error
PerformEvery executes the job on the interval. if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time. the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval.
func (w *CommonWorker) PerformIn(job *Job, d time.Duration) error
PerformIn performs a job after the "in" time has expired.
func (*CommonWorker) PerformReceive
func (w *CommonWorker) PerformReceive(job *Job, readChan interface{}, opt ...Option) error
PerformReceive will loop on receiving data from the readChan (passed as a simple interface{}). This uses the Job ctx for timeouts and cancellation (just like the rest of the framework)
func (w *CommonWorker) Register(name string, h Handler) error
Register Handler with the worker
func (*CommonWorker) SetContext
func (w *CommonWorker) SetContext(ctx context.Context)
SetContext for the worker
func (w *CommonWorker) Start(ctx context.Context) error
Start the worker
func (w *CommonWorker) Stop() error
Stop the worker and you must call Stop() to clean up the CommonWorker internal Ctx (or it will leak memory)
type ConcurrentJob struct {
// PerformWith if the job will be performed with PerformWithEveryWithSync as a reoccuring job
// or just once as PerformWithWithSync
PerformWith PerformWith
// PerformEvery defines the duration between executions of PerformWithEveryWithSync jobs
PerformEvery time.Duration
// MaxWorkers for the concurrent Job
MaxWorkers int64
// Job to run concurrently
Job Job
PerformReceiveChan interface{}
// contains filtered or unexported fields
}
ConcurrentJob represents a job to be run concurrently
func NewConcurrentJob(
job Job,
workerFactory WorkerFactory,
performWith PerformWith,
performEvery time.Duration,
maxWorker int64,
startInterval time.Duration,
logger Logger,
opt ...Option,
) (ConcurrentJob, error)
NewConcurrentJob makes a new job
func (j *ConcurrentJob) Register(name string, h Handler) error
Register a handler for the Job's workers
func (*ConcurrentJob) RunningWorkers
func (j *ConcurrentJob) RunningWorkers() int64
func (j *ConcurrentJob) Start() error
Start all the work
func (j *ConcurrentJob) Stop()
Stop all the work and you must call Stop() to clean up the ConcurrentJob Ctx (or it will leak memory)
type Context struct {
// context.Context allows it to be a compatible context
context.Context
// contains filtered or unexported fields
}
Context for the Job and is reset for every execution
func NewContext(ctx context.Context) Context
NewContext factory
func (c *Context) Get(k string) (interface{}, bool)
Get a key/value
func (c *Context) Set(k string, v interface{})
Set a key/value
func (c *Context) SetStatus(s Status)
SetStatus for the context
func (c *Context) Status() Status
Status retrieves the context's status
type Handler func(j *Job) error
Handler is executed a a Work for a given Job. It also defines the interface for handlers that can be used by middleware adapters
func Adapt(h Handler, adapters ...Adapter) Handler
Adapt a handler with provided middlware adapters
type HealthCheck struct {
// HealthPath is the GET url path for the endpoint
HealthPath string
// Engine is the gin.Engine that should serve the endpoint
Engine *gin.Engine
// Handler is the gin Hanlder to use for the endpoint
Handler gin.HandlerFunc
// contains filtered or unexported fields
}
HealthCheck provides a healthcheck endpoint for the work
func NewHealthCheck(opt ...Option) *HealthCheck
NewHealthCheck creates a new HealthCheck with the options provided. Options: WithEngine(*gin.Engine), WithHealthPath(string), WithHealthHander(gin.HandlerFunc), WithMetricTicker(time.Ticker)
func (h *HealthCheck) Close()
Close cleans up the all the HealthCheck resources
func (*HealthCheck) DefaultHealthHandler
func (h *HealthCheck) DefaultHealthHandler() gin.HandlerFunc
func (h *HealthCheck) GetStatus() int
GetStatus returns the current health status
func (h *HealthCheck) SetStatus(s int)
SetStatus sets the current health status
func (*HealthCheck) WithEngine
func (h *HealthCheck) WithEngine(e *gin.Engine)
WithEngine lets you set the *gin.Engine if it's created after you've created the *HealthCheck
type Job struct {
// Queue the job should be placed into
Queue string
// ctx related to the execution of a job - Perform(job) gets a new ctx everytime
Ctx *Context
// Args that will be passed to the Handler as the 2nd parameter when run
Args Args
// Handler that will be run by the worker
Handler string
// Timeout for every execution of job
Timeout time.Duration
}
Job to be processed by a Worker
func (j *Job) Copy() Job
Copy provides a deep copy of the Job
type Logger interface {
Debugf(string, ...interface{})
Infof(string, ...interface{})
Errorf(string, ...interface{})
Debug(...interface{})
Info(...interface{})
Error(...interface{})
}
Logger is used by worker to write logs
type Metric interface {
Add(n float64)
String() string
Value() float64
}
Metric is a single meter (a counter for now, but in the future: gauge or histogram, optionally - with history)
func NewMetricCounter(frames ...string) Metric
NewCounter returns a counter metric that increments the value with each incoming number.
func NewMetricStatusGauge(min int, max int, frames ...string) Metric
NewMetricStatusGauge is a factory for statusGauge Metrics
type Option func(Options)
Option - how Options are passed as arguments
func WithBanner(useBanner bool) Option
WithBanner specifys the table name to use for an outbox
func WithChannel(ch interface{}) Option
WithChannel optional channel parameter
func WithEngine(e *gin.Engine) Option
WithEngine is an option allowing to set the gin engine when intializing with New. Example : r := gin.Default() p := work.NewPrometheus(WithEngine(r))
func WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) Option
WithErrorPercentage allows you to override the default of 1.0 (100%) with the % you want for error rate.
func WithErrorPercentageByRequestCount(percentageOfErrors float64, minNumOfRequests, lastNumOfRequests int) Option
func WithHealthHandler(h gin.HandlerFunc) Option
WithHealthHandler override the default health endpoint handler
func WithHealthPath(path string) Option
WithHealthPath override the default path for the health endpoint
func WithHealthTicker(ticker *time.Ticker) Option
func WithIgnore(handlers ...string) Option
WithIgnore is used to disable instrumentation on some routes
func WithJob(j *Job) Option
WithJob optional Job parameter
func WithMetricsPath(path string) Option
WithMetricsPath is an option allowing to set the metrics path when intializing with New. Example : work.New(work.WithMetricsPath("/mymetrics"))
func WithNamespace(ns string) Option
WithNamespace is an option allowing to set the namespace when intitializing with New. Example : work.New(work.WithNamespace("my_namespace"))
func WithSampleProbability(sampleProbability float64) Option
WithSampleProbability - optional sample probability
func WithSilentNoResponse(silent bool) Option
WithSilentNoReponse specifies that StatusNoResponse requests should be silent (no logging)
func WithSubSystem(sub string) Option
WithSubsystem is an option allowing to set the subsystem when intitializing with New. Example : work.New(work.WithSubsystem("my_system"))
func WithSync(sync bool) Option
WithSync optional synchronous execution
type Options map[string]interface{}
Options = how options are represented
func GetOpts(opt ...Option) Options
GetOpts - iterate the inbound Options and return a struct
func (o *Options) Get(name string) (interface{}, bool)
Get a specific option by name
type PerformWith int
const (
PerformWithUnknown PerformWith = iota
PerformWithSync
PerformWithAsync
PerformEveryWithSync
PerformEveryWithAsync
PerformReceiveWithSync
PerformReceiveWithAsync
)
func (p PerformWith) String() string
type Prometheus struct {
MetricsPath string
Namespace string
Subsystem string
Ignored isPresentMap
Engine *gin.Engine
// contains filtered or unexported fields
}
Prometheus contains the metrics gathered by the instance and its path
func NewPrometheus(opt ...Option) *Prometheus
New will initialize a new Prometheus instance with the given options. If no options are passed, sane defaults are used. If a router is passed using the Engine() option, this instance will automatically bind to it.
func (*Prometheus) WithEngine
func (p *Prometheus) WithEngine(e *gin.Engine)
WithEngine is a method that should be used if the engine is set after middleware initialization
type Session struct {
// contains filtered or unexported fields
}
Session that is used to pass session info to a Job this is a good spot to put things like *redis.Pool or *sqlx.DB for outbox connection pools
func NewSession() Session
NewSession factory
func (c *Session) Get(k string) (interface{}, bool)
Get a key/value
func (c *Session) Set(k string, v interface{})
Set a key/value
type Status int
Status for a job's execution (Perform)
type Worker interface {
// Start the worker
Start(context.Context) error
// Stop the worker
Stop() error
// PerformEvery a job every interval (loop)
// if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time
// the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval
PerformEvery(*Job, time.Duration, ...Option) error
// Perform a job as soon as possibly, If WithSync(true) Option then it's a blocking call, the default is false (so async)
Perform(*Job, ...Option) error
// PerformAt performs a job at a particular time and always async
PerformAt(*Job, time.Time) error
// PerformIn performs a job after waiting for a specified amount of time and always async
PerformIn(*Job, time.Duration) error
// PeformReceive peforms a job for every value received from channel
PerformReceive(*Job, interface{}, ...Option) error
// Register a Handler
Register(string, Handler) error
// GetContext returns the worker context
GetContext() context.Context
// SetContext sets the worker context
SetContext(context.Context)
}
func NewCommonWorkerFactory(ctx context.Context, l Logger) Worker
NewCommonWorkerFactory is a simple adapter that allows the factory to conform to the WorkerFactory type
type WorkerFactory func(context.Context, Logger) Worker
Generated by godoc2md