forked from tendermint/tendermint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async.go
175 lines (155 loc) · 4.56 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package common
import (
"sync/atomic"
)
//----------------------------------------
// Task
// val: the value returned after task execution.
// err: the error returned during task completion.
// abort: tells Parallel to return, whether or not all tasks have completed.
type Task func(i int) (val interface{}, err error, abort bool)
type TaskResult struct {
Value interface{}
Error error
}
type TaskResultCh <-chan TaskResult
type taskResultOK struct {
TaskResult
OK bool
}
type TaskResultSet struct {
chz []TaskResultCh
results []taskResultOK
}
func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
return &TaskResultSet{
chz: chz,
results: make([]taskResultOK, len(chz)),
}
}
func (trs *TaskResultSet) Channels() []TaskResultCh {
return trs.chz
}
func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
if len(trs.results) <= index {
return TaskResult{}, false
}
resultOK := trs.results[index]
return resultOK.TaskResult, resultOK.OK
}
// NOTE: Not concurrency safe.
// Writes results to trs.results without waiting for all tasks to complete.
func (trs *TaskResultSet) Reap() *TaskResultSet {
for i := 0; i < len(trs.results); i++ {
var trch = trs.chz[i]
select {
case result, ok := <-trch:
if ok {
// Write result.
trs.results[i] = taskResultOK{
TaskResult: result,
OK: true,
}
} else {
// We already wrote it.
}
default:
// Do nothing.
}
}
return trs
}
// NOTE: Not concurrency safe.
// Like Reap() but waits until all tasks have returned or panic'd.
func (trs *TaskResultSet) Wait() *TaskResultSet {
for i := 0; i < len(trs.results); i++ {
var trch = trs.chz[i]
result, ok := <-trch
if ok {
// Write result.
trs.results[i] = taskResultOK{
TaskResult: result,
OK: true,
}
} else {
// We already wrote it.
}
}
return trs
}
// Returns the firstmost (by task index) error as
// discovered by all previous Reap() calls.
func (trs *TaskResultSet) FirstValue() interface{} {
for _, result := range trs.results {
if result.Value != nil {
return result.Value
}
}
return nil
}
// Returns the firstmost (by task index) error as
// discovered by all previous Reap() calls.
func (trs *TaskResultSet) FirstError() error {
for _, result := range trs.results {
if result.Error != nil {
return result.Error
}
}
return nil
}
//----------------------------------------
// Parallel
// Run tasks in parallel, with ability to abort early.
// Returns ok=false iff any of the tasks returned abort=true.
// NOTE: Do not implement quit features here. Instead, provide convenient
// concurrent quit-like primitives, passed implicitly via Task closures. (e.g.
// it's not Parallel's concern how you quit/abort your tasks).
func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
var taskResultChz = make([]TaskResultCh, len(tasks)) // To return.
var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received.
var numPanics = new(int32) // Keep track of panics to set ok=false later.
ok = true // We will set it to false iff any tasks panic'd or returned abort.
// Start all tasks in parallel in separate goroutines.
// When the task is complete, it will appear in the
// respective taskResultCh (associated by task index).
for i, task := range tasks {
var taskResultCh = make(chan TaskResult, 1) // Capacity for 1 result.
taskResultChz[i] = taskResultCh
go func(i int, task Task, taskResultCh chan TaskResult) {
// Recovery
defer func() {
if pnk := recover(); pnk != nil {
atomic.AddInt32(numPanics, 1)
// Send panic to taskResultCh.
taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")}
// Closing taskResultCh lets trs.Wait() work.
close(taskResultCh)
// Decrement waitgroup.
taskDoneCh <- false
}
}()
// Run the task.
var val, err, abort = task(i)
// Send val/err to taskResultCh.
// NOTE: Below this line, nothing must panic/
taskResultCh <- TaskResult{val, err}
// Closing taskResultCh lets trs.Wait() work.
close(taskResultCh)
// Decrement waitgroup.
taskDoneCh <- abort
}(i, task, taskResultCh)
}
// Wait until all tasks are done, or until abort.
// DONE_LOOP:
for i := 0; i < len(tasks); i++ {
abort := <-taskDoneCh
if abort {
ok = false
break
}
}
// Ok is also false if there were any panics.
// We must do this check here (after DONE_LOOP).
ok = ok && (atomic.LoadInt32(numPanics) == 0)
return newTaskResultSet(taskResultChz).Reap(), ok
}