forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
round_robin.go
147 lines (125 loc) · 3.49 KB
/
round_robin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package broker
import (
"sync/atomic"
"time"
"github.com/dafanshu/benthos/v3/internal/component/output"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
// RoundRobin is a broker that implements types.Consumer and sends each message
// out to a single consumer chosen from an array in round-robin fashion.
// Consumers that apply backpressure will block all consumers.
type RoundRobin struct {
running int32
stats metrics.Type
transactions <-chan types.Transaction
outputTSChans []chan types.Transaction
outputs []types.Output
closedChan chan struct{}
closeChan chan struct{}
}
// NewRoundRobin creates a new RoundRobin type by providing consumers.
func NewRoundRobin(outputs []types.Output, stats metrics.Type) (*RoundRobin, error) {
o := &RoundRobin{
running: 1,
stats: stats,
transactions: nil,
outputs: outputs,
closedChan: make(chan struct{}),
closeChan: make(chan struct{}),
}
o.outputTSChans = make([]chan types.Transaction, len(o.outputs))
for i := range o.outputTSChans {
o.outputTSChans[i] = make(chan types.Transaction)
if err := o.outputs[i].Consume(o.outputTSChans[i]); err != nil {
return nil, err
}
}
return o, nil
}
//------------------------------------------------------------------------------
// Consume assigns a new messages channel for the broker to read.
func (o *RoundRobin) Consume(ts <-chan types.Transaction) error {
if o.transactions != nil {
return types.ErrAlreadyStarted
}
o.transactions = ts
go o.loop()
return nil
}
// Connected returns a boolean indicating whether this output is currently
// connected to its target.
func (o *RoundRobin) Connected() bool {
for _, out := range o.outputs {
if !out.Connected() {
return false
}
}
return true
}
// MaxInFlight returns the maximum number of in flight messages permitted by the
// output. This value can be used to determine a sensible value for parent
// outputs, but should not be relied upon as part of dispatcher logic.
func (o *RoundRobin) MaxInFlight() (m int, ok bool) {
for _, out := range o.outputs {
if mif, exists := output.GetMaxInFlight(out); exists && mif > m {
m = mif
ok = true
}
}
return
}
//------------------------------------------------------------------------------
// loop is an internal loop that brokers incoming messages to many outputs.
func (o *RoundRobin) loop() {
defer func() {
for _, c := range o.outputTSChans {
close(c)
}
closeAllOutputs(o.outputs)
close(o.closedChan)
}()
var (
mMsgsRcvd = o.stats.GetCounter("messages.received")
)
i := 0
var open bool
for atomic.LoadInt32(&o.running) == 1 {
var ts types.Transaction
select {
case ts, open = <-o.transactions:
if !open {
return
}
case <-o.closeChan:
return
}
mMsgsRcvd.Incr(1)
select {
case o.outputTSChans[i] <- ts:
case <-o.closeChan:
return
}
i++
if i >= len(o.outputTSChans) {
i = 0
}
}
}
// CloseAsync shuts down the RoundRobin broker and stops processing requests.
func (o *RoundRobin) CloseAsync() {
if atomic.CompareAndSwapInt32(&o.running, 1, 0) {
close(o.closeChan)
}
}
// WaitForClose blocks until the RoundRobin broker has closed down.
func (o *RoundRobin) WaitForClose(timeout time.Duration) error {
select {
case <-o.closedChan:
case <-time.After(timeout):
return types.ErrTimeout
}
return nil
}
//------------------------------------------------------------------------------