-
Notifications
You must be signed in to change notification settings - Fork 99
/
pool.go
61 lines (53 loc) · 1.53 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package worker
import (
"context"
"github.com/alitto/pond"
"github.com/berachain/offchain-sdk/log"
)
// and other functionality to the pool.
type Pool struct {
name string
logger log.Logger
*pond.WorkerPool
}
type TaskGroup struct {
*pond.TaskGroupWithContext
}
// NewPool creates a new pool.
func NewPool(ctx context.Context, logger log.Logger, cfg *PoolConfig) *Pool {
p := &Pool{
name: cfg.Name,
WorkerPool: pond.New(
int(cfg.MaxWorkers),
int(cfg.MaxQueuedJobs),
pond.Strategy(resizerFromString(cfg.ResizingStrategy)),
pond.Context(ctx), // allows for cancelling jobs.
pond.MinWorkers(int(cfg.MinWorkers)),
pond.PanicHandler(PanicHandler(logger)),
),
logger: logger,
}
p.setupMetrics(cfg.PrometheusPrefix)
return p
}
// Logger returns the logger for the pool.
func (p *Pool) Logger() log.Logger {
return p.logger.With("namespace", p.name+"-pool")
}
// StopAndWait stops the pool and waits for all workers to finish.
func (p *Pool) StopAndWait() {
p.Logger().Info("stopping worker pool")
p.Logger().Info("waiting for workers to finish",
"running", p.RunningWorkers(), "idle", p.IdleWorkers(),
)
defer p.Logger().Info("workers finished")
p.WorkerPool.StopAndWait()
}
// Stop stops the pool without waiting for all workers to finish. NOTE: Tasks being executed by
// workers will continue until completion (unless the process is terminated). Tasks in the queue
// will not be executed.
func (p *Pool) Stop() {
p.Logger().Info("stopping worker pool")
defer p.Logger().Info("workers halted")
p.WorkerPool.Stop()
}