-
Notifications
You must be signed in to change notification settings - Fork 7
/
optimus.go
145 lines (129 loc) · 3.64 KB
/
optimus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package optimus
import "sync"
// Table is a representation of a table of data.
type Table interface {
// Rows returns a channel that provides the Rows in the table.
Rows() <-chan Row
// Err returns the first non-EOF error that was encountered by the Table.
Err() error
// Stop signifies that a Table should stop sending Rows down its channel.
// A Table is also responsible for calling Stop on any upstream Tables it knows about.
// Stop should be idempotent. It's expected that Stop will never be called by a consumer of a
// Table unless that consumer is also a Table. It can be used to Stop all upstream Tables in
// the event of an error that needs to halt the pipeline.
Stop()
}
// A Sink function takes a Table and consumes all of its Rows.
type Sink func(Table) error
// Row is a representation of a line of data in a Table.
type Row map[string]interface{}
// TransformFunc is a function that can be applied to a Table to transform it. It should receive the
// Rows from in and may send any number of Rows to out. It should not return until it has finished
// all work (received all the Rows it's going to receive, sent all the Rows it's going to send).
type TransformFunc func(in <-chan Row, out chan<- Row) error
// Transform returns a new Table that provides all the Rows of the input Table transformed with the TransformFunc.
func Transform(source Table, transform TransformFunc) Table {
return newTransformedTable(source, transform)
}
type transformedTable struct {
source Table
err error
rows chan Row
m sync.Mutex
stopped bool
}
func (t *transformedTable) Rows() <-chan Row {
return t.rows
}
func (t *transformedTable) Err() error {
return t.err
}
func (t *transformedTable) Stop() {
t.m.Lock()
stopped := t.stopped
t.m.Unlock()
if stopped {
return
}
t.m.Lock()
t.stopped = true
t.m.Unlock()
t.source.Stop()
}
func drain(c <-chan Row) {
for range c {
// Drain everything left in the channel
}
}
func (t *transformedTable) start(transform TransformFunc) {
// A level of indirection is necessary between the i/o channels and the TransformFunc so that
// the TransformFunc doesn't need to know about the stop state of any of the Tables.
in := make(chan Row)
out := make(chan Row)
errChan := make(chan error)
doneChan := make(chan struct{})
stop := func() {
t.Stop()
drain(t.source.Rows())
drain(out)
close(t.rows)
}
defer stop()
// Once the transform function has returned, close out and error channels
go func() {
defer close(errChan)
defer close(out)
if err := transform(in, out); err != nil {
errChan <- err
}
}()
// Copy from the TransformFunc's out channel to the Table's out channel, then signal done
go func() {
defer func() {
doneChan <- struct{}{}
}()
for row := range out {
t.m.Lock()
stopped := t.stopped
t.m.Unlock()
if stopped {
continue
}
t.rows <- row
}
}()
// Copy from the Table's source to the TransformFunc's in channel, then signal done
go func() {
defer func() {
doneChan <- struct{}{}
}()
defer close(in)
for row := range t.source.Rows() {
t.m.Lock()
stopped := t.stopped
t.m.Unlock()
if stopped {
continue
}
in <- row
}
}()
for err := range errChan {
t.err = err
return
}
// Wait for all channels to finish
<-doneChan // Once to make sure we've consumed the output of the TransformFunc
<-doneChan // Once to make sure we've consumed the output of the source Table
if t.source.Err() != nil {
t.err = t.source.Err()
}
}
func newTransformedTable(source Table, transform TransformFunc) Table {
table := &transformedTable{
source: source,
rows: make(chan Row),
}
go table.start(transform)
return table
}