/
sync.go
556 lines (478 loc) · 11.6 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
package astikit
import (
"bytes"
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// Stat names
const (
StatNameWorkRatio = "astikit.work.ratio"
)
// Chan constants
const (
// Calling Add() only blocks if the chan has been started and the ctx
// has not been canceled
ChanAddStrategyBlockWhenStarted = "block.when.started"
// Calling Add() never blocks
ChanAddStrategyNoBlock = "no.block"
ChanOrderFIFO = "fifo"
ChanOrderFILO = "filo"
)
// Chan is an object capable of executing funcs in a specific order while controlling the conditions
// in which adding new funcs is blocking
// Check out ChanOptions for detailed options
type Chan struct {
cancel context.CancelFunc
c *sync.Cond
ctx context.Context
fs []func()
mc *sync.Mutex // Locks ctx
mf *sync.Mutex // Locks fs
o ChanOptions
running uint32
statWorkDuration *AtomicDuration
}
// ChanOptions are Chan options
type ChanOptions struct {
// Determines the conditions in which Add() blocks. See constants with pattern ChanAddStrategy*
// Default is ChanAddStrategyNoBlock
AddStrategy string
// Order in which the funcs will be processed. See constants with pattern ChanOrder*
// Default is ChanOrderFIFO
Order string
// By default the funcs not yet processed when the context is cancelled are dropped.
// If "ProcessAll" is true, ALL funcs are processed even after the context is cancelled.
// However, no funcs can be added after the context is cancelled
ProcessAll bool
}
// NewChan creates a new Chan
func NewChan(o ChanOptions) *Chan {
return &Chan{
c: sync.NewCond(&sync.Mutex{}),
mc: &sync.Mutex{},
mf: &sync.Mutex{},
o: o,
statWorkDuration: NewAtomicDuration(0),
}
}
// Start starts the chan by looping through functions in the buffer and
// executing them if any, or waiting for a new one otherwise
func (c *Chan) Start(ctx context.Context) {
// Make sure to start only once
if atomic.CompareAndSwapUint32(&c.running, 0, 1) {
// Update status
defer atomic.StoreUint32(&c.running, 0)
// Create context
c.mc.Lock()
c.ctx, c.cancel = context.WithCancel(ctx)
d := c.ctx.Done()
c.mc.Unlock()
// Handle context
go func() {
// Wait for context to be done
<-d
// Signal
c.c.L.Lock()
c.c.Signal()
c.c.L.Unlock()
}()
// Loop
for {
// Lock cond here in case a func is added between retrieving l and doing the if on it
c.c.L.Lock()
// Get number of funcs in buffer
c.mf.Lock()
l := len(c.fs)
c.mf.Unlock()
// Only return if context has been cancelled and:
// - the user wants to drop funcs that has not yet been processed
// - the buffer is empty otherwise
c.mc.Lock()
if c.ctx.Err() != nil && (!c.o.ProcessAll || l == 0) {
c.mc.Unlock()
c.c.L.Unlock()
return
}
c.mc.Unlock()
// No funcs in buffer
if l == 0 {
c.c.Wait()
c.c.L.Unlock()
continue
}
c.c.L.Unlock()
// Get first func
c.mf.Lock()
fn := c.fs[0]
c.mf.Unlock()
// Execute func
n := time.Now()
fn()
c.statWorkDuration.Add(time.Since(n))
// Remove first func
c.mf.Lock()
c.fs = c.fs[1:]
c.mf.Unlock()
}
}
}
// Stop stops the chan
func (c *Chan) Stop() {
c.mc.Lock()
if c.cancel != nil {
c.cancel()
}
c.mc.Unlock()
}
// Add adds a new item to the chan
func (c *Chan) Add(i func()) {
// Check context
c.mc.Lock()
if c.ctx != nil && c.ctx.Err() != nil {
c.mc.Unlock()
return
}
c.mc.Unlock()
// Wrap the function
var fn func()
var wg *sync.WaitGroup
if c.o.AddStrategy == ChanAddStrategyBlockWhenStarted {
wg = &sync.WaitGroup{}
wg.Add(1)
fn = func() {
defer wg.Done()
i()
}
} else {
fn = i
}
// Add func to buffer
c.mf.Lock()
if c.o.Order == ChanOrderFILO {
c.fs = append([]func(){fn}, c.fs...)
} else {
c.fs = append(c.fs, fn)
}
c.mf.Unlock()
// Signal
c.c.L.Lock()
c.c.Signal()
c.c.L.Unlock()
// Wait
if wg != nil {
wg.Wait()
}
}
// Reset resets the chan
func (c *Chan) Reset() {
c.mf.Lock()
defer c.mf.Unlock()
c.fs = []func(){}
}
// ChanStats represents the chan stats
type ChanStats struct {
WorkDuration time.Duration
}
// Stats returns the chan stats
func (c *Chan) Stats() ChanStats {
return ChanStats{WorkDuration: c.statWorkDuration.Duration()}
}
// StatOptions returns the chan stat options
func (c *Chan) StatOptions() []StatOptions {
return []StatOptions{
{
Metadata: &StatMetadata{
Description: "Percentage of time doing work",
Label: "Work ratio",
Name: StatNameWorkRatio,
Unit: "%",
},
Valuer: NewAtomicDurationPercentageStat(c.statWorkDuration),
},
}
}
// BufferPool represents a *bytes.Buffer pool
type BufferPool struct {
bp *sync.Pool
}
// NewBufferPool creates a new BufferPool
func NewBufferPool() *BufferPool {
return &BufferPool{bp: &sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}}
}
// New creates a new BufferPoolItem
func (p *BufferPool) New() *BufferPoolItem {
return newBufferPoolItem(p.bp.Get().(*bytes.Buffer), p.bp)
}
// BufferPoolItem represents a BufferPool item
type BufferPoolItem struct {
*bytes.Buffer
bp *sync.Pool
}
func newBufferPoolItem(b *bytes.Buffer, bp *sync.Pool) *BufferPoolItem {
return &BufferPoolItem{
Buffer: b,
bp: bp,
}
}
// Close implements the io.Closer interface
func (i *BufferPoolItem) Close() error {
i.Reset()
i.bp.Put(i.Buffer)
return nil
}
// GoroutineLimiter is an object capable of doing several things in parallel while maintaining the
// max number of things running in parallel under a threshold
type GoroutineLimiter struct {
busy int
c *sync.Cond
ctx context.Context
cancel context.CancelFunc
o GoroutineLimiterOptions
}
// GoroutineLimiterOptions represents GoroutineLimiter options
type GoroutineLimiterOptions struct {
Max int
}
// NewGoroutineLimiter creates a new GoroutineLimiter
func NewGoroutineLimiter(o GoroutineLimiterOptions) (l *GoroutineLimiter) {
l = &GoroutineLimiter{
c: sync.NewCond(&sync.Mutex{}),
o: o,
}
if l.o.Max <= 0 {
l.o.Max = 1
}
l.ctx, l.cancel = context.WithCancel(context.Background())
go l.handleCtx()
return
}
// Close closes the limiter properly
func (l *GoroutineLimiter) Close() error {
l.cancel()
return nil
}
func (l *GoroutineLimiter) handleCtx() {
<-l.ctx.Done()
l.c.L.Lock()
l.c.Broadcast()
l.c.L.Unlock()
}
// GoroutineLimiterFunc is a GoroutineLimiter func
type GoroutineLimiterFunc func()
// Do executes custom work in a goroutine
func (l *GoroutineLimiter) Do(fn GoroutineLimiterFunc) (err error) {
// Check context in case the limiter has already been closed
if err = l.ctx.Err(); err != nil {
return
}
// Lock
l.c.L.Lock()
// Wait for a goroutine to be available
for l.busy >= l.o.Max {
l.c.Wait()
}
// Check context in case the limiter has been closed while waiting
if err = l.ctx.Err(); err != nil {
return
}
// Increment
l.busy++
// Unlock
l.c.L.Unlock()
// Execute in a goroutine
go func() {
// Decrement
defer func() {
l.c.L.Lock()
l.busy--
l.c.Signal()
l.c.L.Unlock()
}()
// Execute
fn()
}()
return
}
// Eventer represents an object that can dispatch simple events (name + payload)
type Eventer struct {
c *Chan
hs map[string][]EventerHandler
mh *sync.Mutex
}
// EventerOptions represents Eventer options
type EventerOptions struct {
Chan ChanOptions
}
// EventerHandler represents a function that can handle the payload of an event
type EventerHandler func(payload interface{})
// NewEventer creates a new eventer
func NewEventer(o EventerOptions) *Eventer {
return &Eventer{
c: NewChan(o.Chan),
hs: make(map[string][]EventerHandler),
mh: &sync.Mutex{},
}
}
// On adds an handler for a specific name
func (e *Eventer) On(name string, h EventerHandler) {
// Lock
e.mh.Lock()
defer e.mh.Unlock()
// Add handler
e.hs[name] = append(e.hs[name], h)
}
// Dispatch dispatches a payload for a specific name
func (e *Eventer) Dispatch(name string, payload interface{}) {
// Lock
e.mh.Lock()
defer e.mh.Unlock()
// No handlers
hs, ok := e.hs[name]
if !ok {
return
}
// Loop through handlers
for _, h := range hs {
func(h EventerHandler) {
// Add to chan
e.c.Add(func() {
h(payload)
})
}(h)
}
}
// Start starts the eventer. It is blocking
func (e *Eventer) Start(ctx context.Context) {
e.c.Start(ctx)
}
// Stop stops the eventer
func (e *Eventer) Stop() {
e.c.Stop()
}
// Reset resets the eventer
func (e *Eventer) Reset() {
e.c.Reset()
}
// DebugMutex represents a rwmutex capable of logging its actions to ease deadlock debugging
type DebugMutex struct {
l CompleteLogger
lastCaller string
lastCallerMutex *sync.Mutex
ll LoggerLevel
m *sync.RWMutex
name string
timeout time.Duration
}
// DebugMutexOpt represents a debug mutex option
type DebugMutexOpt func(m *DebugMutex)
// DebugMutexWithLockLogging allows logging all mutex locks
func DebugMutexWithLockLogging(ll LoggerLevel) DebugMutexOpt {
return func(m *DebugMutex) {
m.ll = ll
}
}
// DebugMutexWithDeadlockDetection allows detecting deadlock for all mutex locks
func DebugMutexWithDeadlockDetection(timeout time.Duration) DebugMutexOpt {
return func(m *DebugMutex) {
m.timeout = timeout
}
}
// NewDebugMutex creates a new debug mutex
func NewDebugMutex(name string, l StdLogger, opts ...DebugMutexOpt) *DebugMutex {
m := &DebugMutex{
l: AdaptStdLogger(l),
lastCallerMutex: &sync.Mutex{},
ll: LoggerLevelDebug - 1,
m: &sync.RWMutex{},
name: name,
}
for _, opt := range opts {
opt(m)
}
return m
}
func (m *DebugMutex) caller() (o string) {
if _, file, line, ok := runtime.Caller(2); ok {
o = fmt.Sprintf("%s:%d", file, line)
}
return
}
func (m *DebugMutex) log(fmt string, args ...interface{}) {
if m.ll < LoggerLevelDebug {
return
}
m.l.Writef(m.ll, fmt, args...)
}
func (m *DebugMutex) watchTimeout(caller string, fn func()) {
if m.timeout <= 0 {
fn()
return
}
ctx, cancel := context.WithTimeout(context.Background(), m.timeout)
defer cancel()
go func() {
<-ctx.Done()
if err := ctx.Err(); err != nil && errors.Is(err, context.DeadlineExceeded) {
m.lastCallerMutex.Lock()
lastCaller := m.lastCaller
m.lastCallerMutex.Unlock()
m.l.Errorf("astikit: %s mutex timed out at %s with last caller at %s", m.name, caller, lastCaller)
}
}()
fn()
}
// Lock write locks the mutex
func (m *DebugMutex) Lock() {
c := m.caller()
m.log("astikit: requesting lock for %s at %s", m.name, c)
m.watchTimeout(c, m.m.Lock)
m.log("astikit: lock acquired for %s at %s", m.name, c)
m.lastCallerMutex.Lock()
m.lastCaller = c
m.lastCallerMutex.Unlock()
}
// Unlock write unlocks the mutex
func (m *DebugMutex) Unlock() {
m.m.Unlock()
m.log("astikit: unlock executed for %s", m.name)
}
// RLock read locks the mutex
func (m *DebugMutex) RLock() {
c := m.caller()
m.log("astikit: requesting rlock for %s at %s", m.name, c)
m.watchTimeout(c, m.m.RLock)
m.log("astikit: rlock acquired for %s at %s", m.name, c)
m.lastCallerMutex.Lock()
m.lastCaller = c
m.lastCallerMutex.Unlock()
}
// RUnlock read unlocks the mutex
func (m *DebugMutex) RUnlock() {
m.m.RUnlock()
m.log("astikit: unlock executed for %s", m.name)
}
type AtomicDuration struct {
d time.Duration
m *sync.Mutex
}
func NewAtomicDuration(d time.Duration) *AtomicDuration {
return &AtomicDuration{
d: d,
m: &sync.Mutex{},
}
}
func (d *AtomicDuration) Add(delta time.Duration) {
d.m.Lock()
defer d.m.Unlock()
d.d += delta
}
func (d *AtomicDuration) Duration() time.Duration {
d.m.Lock()
defer d.m.Unlock()
return d.d
}