-
Notifications
You must be signed in to change notification settings - Fork 3
/
utility-stages.go
128 lines (120 loc) · 3.2 KB
/
utility-stages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package pipeline
import "sync"
// Map applies the provided operation to each chunk that passes through it. It sends errors from
// the operation to the errors channel, and will not send on a FileChunk that caused an error in
// the operation.
func Map(chunks <-chan FileChunk, errors chan<- error, operation func(FileChunk) (FileChunk, error)) <-chan FileChunk {
dataChunks := make(chan FileChunk)
go func() {
defer close(dataChunks)
for chunk := range chunks {
if newChunk, err := operation(chunk); err != nil {
errors <- err
} else {
dataChunks <- newChunk
}
}
}()
return dataChunks
}
// Filter applies the provided closure to every FileChunk, passing on only FileChunks that satisfy the
// closure's boolean output. If the closure returns an error, that will be passed on the errors channel.
func Filter(chunks <-chan FileChunk, errors chan<- error, filter func(FileChunk) (bool, error)) <-chan FileChunk {
dataChunks := make(chan FileChunk)
go func() {
defer close(dataChunks)
for chunk := range chunks {
if ok, err := filter(chunk); err != nil {
errors <- err
} else if ok {
dataChunks <- chunk
}
}
}()
return dataChunks
}
// Separate divides the input channel into two output channels based on some condition.
// If the condition is true, the current chunk goes to the first output channel, otherwise
// it goes to the second.
func Separate(chunks <-chan FileChunk, errors chan<- error, condition func(FileChunk) (bool, error)) (<-chan FileChunk, <-chan FileChunk) {
a := make(chan FileChunk)
b := make(chan FileChunk)
go func() {
defer close(a)
defer close(b)
for chunk := range chunks {
if ok, err := condition(chunk); err != nil {
errors <- err
} else if ok {
a <- chunk
} else {
b <- chunk
}
}
}()
return a, b
}
// Fork copies the input to two output channels, allowing a pipeline to
// diverge.
func Fork(chunks <-chan FileChunk) (<-chan FileChunk, <-chan FileChunk) {
a := make(chan FileChunk)
b := make(chan FileChunk)
go func() {
defer close(a)
defer close(b)
for chunk := range chunks {
a <- chunk
b <- chunk
}
}()
return a, b
}
// Divide distributes the input channel across divisor new channels, which
// are returned in a slice.
func Divide(chunks <-chan FileChunk, divisor uint) []chan FileChunk {
chans := make([]chan FileChunk, divisor)
for i := range chans {
chans[i] = make(chan FileChunk)
}
go func() {
defer func() {
for _, channel := range chans {
close(channel)
}
}()
var count uint
for chunk := range chunks {
chans[count%divisor] <- chunk
count++
}
}()
return chans
}
// Join performs a fan-in on the many input channels to combine their
// data into output channel.
func Join(chans ...<-chan FileChunk) <-chan FileChunk {
var wg sync.WaitGroup
chunks := make(chan FileChunk)
go func() {
defer close(chunks)
for _, channel := range chans {
wg.Add(1)
go func(c <-chan FileChunk) {
defer wg.Done()
for chunk := range c {
chunks <- chunk
}
}(channel)
}
wg.Wait()
}()
return chunks
}
// Consume reads the channel until it is empty, consigning its
// contents to the void.
func Consume(channel <-chan FileChunk) {
go func() {
for range channel {
}
}()
}