-
Notifications
You must be signed in to change notification settings - Fork 1
/
moderator.go
123 lines (111 loc) · 2.57 KB
/
moderator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/*
© 2020–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"context"
"fmt"
"sync"
"github.com/haraldrudell/parl/perrors"
)
const (
defaultParallelism = 20
)
/*
Moderator invokes functions at a limited level of parallelism.
It is a ticketing system
m := NewModerator(20, ctx)
m.Do(func() (err error) { // waiting here for a ticket
// got a ticket!
…
return or panic // ticket automatically returned
m.String() → waiting: 2(20)
*/
type Moderator struct {
parallelism uint64
cond *sync.Cond
ctx context.Context
active uint64 // behind lock
waiting uint64 // behind lock
}
// NewModerator creates a new Moderator used to limit parallelism
func NewModerator(parallelism uint64, ctx context.Context) (mo *Moderator) {
if parallelism == 0 {
parallelism = defaultParallelism
}
if ctx == nil {
panic(perrors.New("NewModerator with nil context"))
}
m := Moderator{parallelism: parallelism, ctx: ctx}
m.cond = sync.NewCond(&sync.Mutex{})
go m.shutdownThread()
return &m
}
// Do calls fn limited by the moderator’s parallelism.
// If the moderator is shut down, ErrModeratorShutdown is returned
func (mo *Moderator) Do(fn func() error) (err error) {
if fn == nil {
return New("Moderator.Do with nil function")
}
if err = mo.getTicket(); err != nil {
return
}
defer mo.returnTicket() // we will always get a ticket, and it should be returned
return fn()
}
func (mo *Moderator) getTicket() (err error) {
mo.cond.L.Lock()
defer mo.cond.L.Unlock()
isWaiting := false
for {
if mo.ctx.Err() != nil {
err = mo.ctx.Err()
break
}
if mo.active < mo.parallelism {
mo.active++
break
}
if !isWaiting {
isWaiting = true
mo.waiting++
}
mo.cond.Wait()
}
if isWaiting {
mo.waiting--
}
return
}
func (mo *Moderator) returnTicket() {
mo.cond.L.Lock()
mo.cond.Signal()
defer mo.cond.L.Unlock()
mo.active--
}
func (mo *Moderator) shutdownThread() {
<-mo.ctx.Done()
mo.cond.Broadcast()
}
func (mo *Moderator) Status() (parallelism uint64, active uint64, waiting uint64, isShutdown bool) {
parallelism = mo.parallelism
mo.cond.L.Lock()
active = mo.active
waiting = mo.waiting
mo.cond.L.Unlock()
isShutdown = mo.ctx.Err() != nil
return
}
func (mo *Moderator) String() (s string) {
parallelism, active, waiting, sd := mo.Status()
if active < parallelism {
s = fmt.Sprintf("available: %d(%d)", parallelism-active, parallelism)
} else {
s = fmt.Sprintf("waiting: %d(%d)", waiting, parallelism)
}
if sd {
s += " shutdown"
}
return
}