-
-
Notifications
You must be signed in to change notification settings - Fork 450
/
fanin.go
executable file
·69 lines (54 loc) · 1.29 KB
/
fanin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main
import (
"fmt"
"sync"
)
// returns a read only channel
func generator(start, end int) <-chan int {
c := make(chan int) // ubuffered channel
// Fire a goroutine to send values on the channel
go func() {
for i := start; i < end; i++ {
c <- i // This blocks untill there is a reader for the chan
}
// close the channel when done; otherwise it leaks resources
close(c)
}()
return c
}
/* The fan in pattern is an important pattern which combines
mulitple channels, returns a single channel from those channels
*/
func fanIn(chans ...<-chan int) chan int {
var wg sync.WaitGroup
c := make(chan int)
// Closure to send values from a channel
output := func(ch <-chan int) {
for n := range ch {
c <- n
}
wg.Done()
}
wg.Add(len(chans))
// send values on c via differnt goroutines
for _, ch := range chans {
go output(ch)
}
// wait for all goroutines to finish before closing the channel
go func() {
wg.Wait()
close(c)
}()
return c
}
func main() {
s1 := generator(1, 10)
s2 := generator(20, 30)
s3 := generator(40, 50)
s4 := generator(60, 70)
// merge all the channels into one
mergerd := fanIn(s1, s2, s3, s4)
for n := range mergerd { // range loop terminates once the chan is closed, otherwise it blocks if there is no value
fmt.Println(n)
}
}