-
Notifications
You must be signed in to change notification settings - Fork 1
/
serialdo.go
147 lines (133 loc) · 3.32 KB
/
serialdo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
© 2021–present Harald Rudell <harald.rudell@gmail.com> (https://haraldrudell.github.io/haraldrudell/)
ISC License
*/
package parl
import (
"context"
"strconv"
"sync"
"sync/atomic"
"time"
)
// serialdo invokes method in sequence
type SerialDo struct {
inChan chan *time.Time
inChanLock sync.Mutex
isBusy AtomicBool
isShutdown AtomicBool
thunk func()
cbFunc SerialDoFunc
ErrCh chan error
ID string
Wg sync.WaitGroup
ctx context.Context
}
// NewSerialDo SerialDo. errors on sdo.ErrCh
func NewSerialDo(thunk func(), eventReceiver SerialDoFunc, ctx context.Context) (sdo *SerialDo) {
if thunk == nil {
panic(Errorf("NewSerialDo with thunk nil"))
}
if ctx == nil {
ctx = context.Background()
}
if eventReceiver == nil {
eventReceiver = func(e SerialDoEvent, s *SerialDo, t *time.Time) {}
}
ID := strconv.FormatUint(atomic.AddUint64(&serialDoNo, 1), 10)
sdo = &SerialDo{
inChan: make(chan *time.Time, 1), inChanLock: sync.Mutex{},
thunk: thunk, cbFunc: eventReceiver, ErrCh: make(chan error),
ID: ID, ctx: ctx,
}
sdo.Wg.Add(1)
go sdo.inReader()
return
}
type SerialDoEvent uint8
type SerialDoFunc func(SerialDoEvent, *SerialDo, *time.Time)
var serialDoNo uint64
const (
SerialDoReady = 0 + iota
SerialDoLaunch // from idle, now time
SerialDoPending // queued up invocation, request time
SerialDoPendingLaunch // launch of pending invocation, request time
SerialDoIdle // busy since
)
// Invoke thunk serially, maximum queue one invocation, drop additional invocation requests prior to idle. non-blocking Thread-safe
func (sdo *SerialDo) Do(now time.Time) (nowPending bool) {
if sdo.isShutdown.IsTrue() {
panic(Errorf("SerialDo#%s: Do after shutdown", sdo.ID))
}
if len(sdo.inChan) > 0 || !sdo.sendInChan(&now) {
return
}
nowPending = sdo.isBusy.IsTrue()
if nowPending {
sdo.cbFunc(SerialDoPending, sdo, &now)
}
return
}
func (sdo *SerialDo) sendInChan(now *time.Time) bool {
sdo.inChanLock.Lock()
defer sdo.inChanLock.Unlock()
if len(sdo.inChan) > 0 || sdo.isShutdown.IsTrue() {
return false
}
sdo.inChan <- now
return true
}
func (sdo *SerialDo) Shutdown() {
if !sdo.isShutdown.Set() {
return // was already set
}
sdo.inChanLock.Lock()
defer sdo.inChanLock.Unlock()
close(sdo.inChan) // closes run thread
}
func (sdo *SerialDo) inReader() {
defer sdo.Wg.Done()
defer close(sdo.ErrCh)
defer Recover("SerialDo#"+sdo.ID+".inReader", nil, func(e error) { sdo.ErrCh <- e })
sdo.cbFunc(SerialDoReady, sdo, nil)
var launchTime *time.Time
var ok bool
for {
select {
case launchTime, ok = <-sdo.inChan:
case _, ok = <-sdo.ctx.Done():
sdo.Shutdown()
}
if !ok {
break
}
// data from inChan: launch
sdo.cbFunc(SerialDoLaunch, sdo, launchTime)
for {
sdo.isBusy.Set()
ch := make(chan struct{})
sdo.Wg.Add(1)
go func() {
defer sdo.Wg.Done()
defer close(ch)
sdo.thunk()
}()
select {
case <-ch:
case _, ok = <-sdo.ctx.Done():
sdo.Shutdown()
}
if !ok || // application is terminating
len(sdo.inChan) < 1 { // no pending requests: enter idle state
sdo.isBusy.Clear()
break
}
pending := <-sdo.inChan
sdo.cbFunc(SerialDoPendingLaunch, sdo, pending)
}
if !ok {
break
}
sdo.cbFunc(SerialDoIdle, sdo, launchTime) // now idle, busy since launchTime
}
}