-
Notifications
You must be signed in to change notification settings - Fork 13
/
tasks.go
executable file
·115 lines (95 loc) · 2.53 KB
/
tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package event
import (
"sync"
// Frameworks
"github.com/djthorpe/gopi/util/errors"
)
////////////////////////////////////////////////////////////////////////////////
// TYPES
type Tasks struct {
sync.WaitGroup
tasks []*task
err errors.CompoundError
}
type TaskFunc func(start chan<- Signal, stop <-chan Signal) error
type Signal struct{}
type task struct {
start, stop chan Signal
}
var DONE = Signal{}
////////////////////////////////////////////////////////////////////////////////
// PUBLIC METHODS
// Start tasks in the background and waits for all "start" signals to be
// returned before unblocking
func (this *Tasks) Start(funcs ...TaskFunc) {
// Create any instance variables
this.new()
var wg sync.WaitGroup // wg blocks until all start signals are in
wg.Add(len(funcs))
this.Add(len(funcs)) // this blocks until all stop signals are in
for _, fn := range funcs {
t := &task{make(chan Signal), make(chan Signal)}
// Append onto the list of tasks
this.tasks = append(this.tasks, t)
// Run the task, then mark as done
go func(fn TaskFunc, t *task) {
if err := this.run(fn, t); err != nil {
this.err.Add(err)
}
this.Done()
}(fn, t)
// In the background, wait for start signal, or nil if the task
// ends without sending a start signal
go func(t *task) {
select {
case <-t.start:
wg.Done() // Indicate the task has started
}
}(t)
}
// Wait for all started
wg.Wait()
}
// Close sends done signals to each go routine and will return any error
// from the tasks. Each go routine may end before the 'stop' signal is returned...
func (this *Tasks) Close() error {
if len(this.tasks) > 0 {
// Signal all functions to stop
for _, t := range this.tasks {
t.stop <- DONE
}
// Close all stop channels
for _, t := range this.tasks {
close(t.stop)
}
// Wait for all tasks to complete
this.Wait()
}
// clear tasks & errors
this.tasks = nil
if this.err.Success() {
return nil
} else if err := this.err.One(); err != nil {
return err
} else {
return &this.err
}
}
////////////////////////////////////////////////////////////////////////////////
// PRIVATE METHODS
func (this *Tasks) new() {
if this.tasks == nil {
this.tasks = make([]*task, 0)
}
}
func (this *Tasks) run(fn TaskFunc, t *task) error {
// Start the function and wait for error return
err := fn(t.start, t.stop)
// Close the start channel
close(t.start)
// Receive stop signal in background - this
// gets the 'nil' on close of the channel
go func() { <-t.stop }()
// return any errors
return err
}