forked from pingcap/br
/
worker.go
97 lines (83 loc) · 2.04 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package utils
import (
"github.com/pingcap/log"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// WorkerPool contains a pool of workers.
type WorkerPool struct {
limit uint
workers chan *Worker
name string
}
// Worker identified by ID.
type Worker struct {
ID uint64
}
type taskFunc func()
type identifiedTaskFunc func(uint64)
// NewWorkerPool returns a WorkPool.
func NewWorkerPool(limit uint, name string) *WorkerPool {
workers := make(chan *Worker, limit)
for i := uint(0); i < limit; i++ {
workers <- &Worker{ID: uint64(i + 1)}
}
return &WorkerPool{
limit: limit,
workers: workers,
name: name,
}
}
// Apply executes a task.
func (pool *WorkerPool) Apply(fn taskFunc) {
worker := pool.apply()
go func() {
defer pool.recycle(worker)
fn()
}()
}
// ApplyWithID execute a task and provides it with the worker ID.
func (pool *WorkerPool) ApplyWithID(fn identifiedTaskFunc) {
worker := pool.apply()
go func() {
defer pool.recycle(worker)
fn(worker.ID)
}()
}
// ApplyOnErrorGroup executes a task in an errorgroup.
func (pool *WorkerPool) ApplyOnErrorGroup(eg *errgroup.Group, fn func() error) {
worker := pool.apply()
eg.Go(func() error {
defer pool.recycle(worker)
return fn()
})
}
// ApplyWithIDInErrorGroup executes a task in an errorgroup and provides it with the worker ID.
func (pool *WorkerPool) ApplyWithIDInErrorGroup(eg *errgroup.Group, fn func(id uint64) error) {
worker := pool.apply()
eg.Go(func() error {
defer pool.recycle(worker)
return fn(worker.ID)
})
}
func (pool *WorkerPool) apply() *Worker {
var worker *Worker
select {
case worker = <-pool.workers:
default:
log.Debug("wait for workers", zap.String("pool", pool.name))
worker = <-pool.workers
}
return worker
}
func (pool *WorkerPool) recycle(worker *Worker) {
if worker == nil {
panic("invalid restore worker")
}
pool.workers <- worker
}
// HasWorker checks if the pool has unallocated workers.
func (pool *WorkerPool) HasWorker() bool {
return len(pool.workers) > 0
}