/
roud_robin.go
74 lines (63 loc) · 1.53 KB
/
roud_robin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package instruction
import (
"io"
"log"
"sync"
"sync/atomic"
"github.com/chrislusf/gleam/pb"
"github.com/chrislusf/gleam/util"
)
func init() {
InstructionRunner.Register(func(m *pb.Instruction) Instruction {
if m.GetRoundRobin() != nil {
return NewRoundRobin()
}
return nil
})
}
type RoundRobin struct {
}
func NewRoundRobin() *RoundRobin {
return &RoundRobin{}
}
func (b *RoundRobin) Name(prefix string) string {
return prefix + ".RoundRobin"
}
func (b *RoundRobin) Function() func(readers []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error {
return func(readers []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error {
return DoRoundRobin(readers, writers, stats)
}
}
func (b *RoundRobin) SerializeToCommand() *pb.Instruction {
return &pb.Instruction{
RoundRobin: &pb.Instruction_RoundRobin{},
}
}
func (b *RoundRobin) GetMemoryCostInMB(partitionSize int64) int64 {
return 1
}
func DoRoundRobin(reader []io.Reader, writers []io.Writer, stats *pb.InstructionStat) error {
shardCount := int32(len(writers))
var wg sync.WaitGroup
count := int32(0)
for _, r := range reader {
wg.Add(1)
go func(r io.Reader) {
err := util.ProcessMessage(r, func(data []byte) error {
atomic.AddInt64(&stats.InputCounter, 1)
atomic.AddInt32(&count, 1)
err := util.WriteMessage(writers[count%shardCount], data)
if err == nil {
atomic.AddInt64(&stats.OutputCounter, 1)
}
return err
})
if err != nil {
log.Println(err)
}
wg.Done()
}(r)
}
wg.Wait()
return nil
}