-
Notifications
You must be signed in to change notification settings - Fork 33
/
conductor.go
166 lines (139 loc) · 4.3 KB
/
conductor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Package conductor is a small helper to execute work heavy operations
// in the backgrounds that deliver partial results ("result streaming").
package conductor
import (
"fmt"
"sync"
"time"
)
// ErrNoSuchTicket will be returned by Push() or Pop() if there is no such ticket
// with this id. This will be also returned when accessing a previously expired ticket.
type ErrNoSuchTicket struct {
ticket uint64
}
func (ens ErrNoSuchTicket) Error() string {
return fmt.Sprintf("no data associated with `%d` (or it timed out)", ens.ticket)
}
// IsNoSuchTicket returns true if `err` is a ErrNoSuchTicket error.
func IsNoSuchTicket(err error) bool {
_, ok := err.(ErrNoSuchTicket)
return ok
}
// ErrNoDataLeft is returned by Pop() if there is no data left to be returned.
// This will only happen if the exec func returned and there are no cached results.
type ErrNoDataLeft struct {
ticket uint64
}
func (end ErrNoDataLeft) Error() string {
return fmt.Sprintf("No data left for ticket `%d`", end.ticket)
}
// IsNoDataLeft returns true if `err` is a ErrNoDataLeft error.
func IsNoDataLeft(err error) bool {
_, ok := err.(ErrNoDataLeft)
return ok
}
// Conductor (as in train conductor) yields tickets for jobs.
// There are three operations:
// - Exec: Execute heavy work in the background, caller get's a ticket.
// - Push: Report partial results of the heavy work.
// - Pull: Fetch a partial result from the caller side via the ticket.
//
// Tickets are only job ids, i.e. plain integers.
// The concept is similar to futures, but the result will clean up
// themselves after a certain timeout.
type Conductor struct {
mu sync.Mutex
timeout time.Duration
maxData int
ticketCount uint64
tickets map[uint64]chan interface{}
errors map[uint64]error
}
// New creates a new conductor that will expire unfetched results
// after `timeout` and will hold at max `maxData` partial results in memory.
func New(timeout time.Duration, maxData int) *Conductor {
return &Conductor{
timeout: timeout,
maxData: maxData,
tickets: make(map[uint64]chan interface{}),
errors: make(map[uint64]error),
}
}
// Exec executes `fn` in the background. `fn` gets the current ticket and can
// use it to push partial results using Push(). Exec will return a ticket number
// that can be used by the caller to Pull() partial results until ErrNoDataLeft
// is returned.
func (cd *Conductor) Exec(fn func(ticket uint64) error) uint64 {
cd.mu.Lock()
defer cd.mu.Unlock()
cd.ticketCount++
ticket := cd.ticketCount
cd.tickets[ticket] = make(chan interface{}, cd.maxData)
go func() {
if err := fn(ticket); err != nil {
cd.mu.Lock()
cd.errors[ticket] = err
cd.mu.Unlock()
}
cd.mu.Lock()
if ticketCh, ok := cd.tickets[ticket]; ok {
close(ticketCh)
}
cd.mu.Unlock()
time.Sleep(cd.timeout)
cd.mu.Lock()
delete(cd.tickets, ticket)
delete(cd.errors, ticket)
cd.mu.Unlock()
}()
return ticket
}
// Push records a new partial result under a specific `ticket`.
// Push will block after pushing more than `maxData` items.
func (cd *Conductor) Push(ticket uint64, data interface{}) error {
cd.mu.Lock()
if err, ok := cd.errors[ticket]; ok && err != nil {
cd.mu.Unlock()
return err
}
if _, ok := cd.tickets[ticket]; !ok {
cd.mu.Unlock()
return fmt.Errorf("push outside exec fn is forbidden")
}
cd.mu.Unlock()
cd.tickets[ticket] <- data
return nil
}
// This is it's own function to make use of defer possible.
func (cd *Conductor) fetchTicketCh(ticket uint64) (chan interface{}, error) {
cd.mu.Lock()
defer cd.mu.Unlock()
if err, ok := cd.errors[ticket]; ok && err != nil {
return nil, err
}
ticketCh, ok := cd.tickets[ticket]
if !ok {
return nil, ErrNoSuchTicket{ticket}
}
return ticketCh, nil
}
// Pop gets the first (FIFO) partial result associated with `ticket`.
// It will return ErrNoDataLeft if there won't be any more values.
// It will return ErrNoSuchTicket if you passed an invalid ticket.
func (cd *Conductor) Pop(ticket uint64) (interface{}, error) {
ticketCh, err := cd.fetchTicketCh(ticket)
if err != nil {
return nil, err
}
// Wait until we get results:
timer := time.NewTimer(cd.timeout)
select {
case data, ok := <-ticketCh:
if !ok {
return nil, ErrNoDataLeft{ticket}
}
return data, nil
case <-timer.C:
return nil, fmt.Errorf("pop took too long (%v)", cd.timeout)
}
}