-
Notifications
You must be signed in to change notification settings - Fork 12
/
runners.go
141 lines (131 loc) · 3.1 KB
/
runners.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package gohalt
import (
"context"
"fmt"
"sync"
)
// Runner defines abstraction to execute a set of `Runnable`
// and return possible execution error back.
// Runner is designed to simplify work with throttlers
// by managing `Acquire`/`Release` loop.
type Runner interface {
// Run executes single prodived `Runnable` instance.
Run(Runnable)
// Result returns possible execution error back.
Result() error
}
type rsync struct {
thr Throttler
ctx context.Context
err error
report func(error)
}
// NewRunnerSync creates synchronous runner instance
// that runs a set of `Runnable` consecutively
// with regard to the provided context and throttler.
// First occurred error is returned from result.
func NewRunnerSync(ctx context.Context, thr Throttler) Runner {
ctx, cancel := context.WithCancel(ctx)
r := rsync{thr: thr, ctx: ctx}
r.report = func(err error) {
if err != nil {
if r.err == nil {
r.err = err
cancel()
}
log("sync runner error happened %v", err)
}
}
return &r
}
func (r *rsync) Run(run Runnable) {
select {
case <-r.ctx.Done():
r.report(fmt.Errorf("context error has happened %w", r.ctx.Err()))
return
default:
}
defer func() {
if err := r.thr.Release(r.ctx); err != nil {
r.report(fmt.Errorf("throttler error has happened %w", err))
}
}()
if err := r.thr.Acquire(r.ctx); err != nil {
r.report(fmt.Errorf("throttler error has happened %w", err))
return
}
select {
case <-r.ctx.Done():
r.report(fmt.Errorf("context error has happened %w", r.ctx.Err()))
return
default:
}
if err := run(r.ctx); err != nil {
r.report(fmt.Errorf("runnable error has happened %w", err))
return
}
}
func (r *rsync) Result() error {
return r.err
}
type rasync struct {
thr Throttler
ctx context.Context
wg sync.WaitGroup
err error
report func(error)
}
// NewRunnerAsync creates asynchronous runner instance
// that runs a set of `Runnable` simultaneously
// with regard to the provided context and throttler.
// First occurred error is returned from result.
func NewRunnerAsync(ctx context.Context, thr Throttler) Runner {
ctx, cancel := context.WithCancel(ctx)
r := rasync{thr: thr, ctx: ctx}
var once sync.Once
r.report = func(err error) {
if err != nil {
once.Do(func() {
r.err = err
cancel()
})
log("async runner error happened %v", err)
}
}
return &r
}
func (r *rasync) Run(run Runnable) {
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case <-r.ctx.Done():
r.report(fmt.Errorf("context error has happened %w", r.ctx.Err()))
return
default:
}
defer func() {
if err := r.thr.Release(r.ctx); err != nil {
r.report(fmt.Errorf("throttler error has happened %w", err))
}
}()
if err := r.thr.Acquire(r.ctx); err != nil {
r.report(fmt.Errorf("throttler error has happened %w", err))
return
}
select {
case <-r.ctx.Done():
r.report(fmt.Errorf("context error has happened %w", r.ctx.Err()))
return
default:
}
if err := run(r.ctx); err != nil {
r.report(fmt.Errorf("runnable error has happened %w", err))
return
}
}()
}
func (r *rasync) Result() error {
r.wg.Wait()
return r.err
}