-
Notifications
You must be signed in to change notification settings - Fork 2
/
context.go
149 lines (130 loc) · 3.48 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package lb
import (
"time"
"github.com/dearcode/libbeat/common/op"
"github.com/dearcode/libbeat/outputs"
"github.com/dearcode/libbeat/outputs/mode"
)
// context distributes event messages among multiple workers. It implements the
// load-balancing strategy itself.
type context struct {
timeout time.Duration // Send/retry timeout. Every timeout is a failed send attempt
// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
maxAttempts int
// signaling channel for handling shutdown
done chan struct{}
// channels for forwarding work items to workers.
// The work channel is used by publisher to insert new events
// into the load balancer. The work channel is synchronous blocking until timeout
// for one worker available.
// The retries channel is used to forward failed send attempts to other workers.
// The retries channel is buffered to mitigate possible deadlocks when all
// workers become unresponsive.
work, retries chan eventsMessage
}
type eventsMessage struct {
worker int
attemptsLeft int
signaler op.Signaler
data []outputs.Data
datum outputs.Data
}
func makeContext(nClients, maxAttempts int, timeout time.Duration) context {
return context{
timeout: timeout,
maxAttempts: maxAttempts,
done: make(chan struct{}),
work: make(chan eventsMessage),
retries: make(chan eventsMessage, nClients*2),
}
}
func (ctx *context) Close() error {
debugf("close context")
close(ctx.done)
return nil
}
func (ctx *context) pushEvents(msg eventsMessage, guaranteed bool) bool {
maxAttempts := ctx.maxAttempts
if guaranteed {
maxAttempts = -1
}
msg.attemptsLeft = maxAttempts
ok := ctx.forwardEvent(ctx.work, msg)
if !ok {
dropping(msg)
}
return ok
}
func (ctx *context) pushFailed(msg eventsMessage) bool {
ok := ctx.forwardEvent(ctx.retries, msg)
if !ok {
dropping(msg)
}
return ok
}
func (ctx *context) tryPushFailed(msg eventsMessage) bool {
if msg.attemptsLeft == 0 {
dropping(msg)
return true
}
select {
case ctx.retries <- msg:
return true
default:
return false
}
}
func (ctx *context) forwardEvent(ch chan eventsMessage, msg eventsMessage) bool {
debugf("forwards msg with attempts=%v", msg.attemptsLeft)
if msg.attemptsLeft < 0 {
select {
case ch <- msg:
debugf("message forwarded")
return true
case <-ctx.done: // shutdown
debugf("shutting down")
return false
}
} else {
for ; msg.attemptsLeft > 0; msg.attemptsLeft-- {
select {
case ch <- msg:
debugf("message forwarded")
return true
case <-ctx.done: // shutdown
debugf("shutting down")
return false
case <-time.After(ctx.timeout):
debugf("forward timed out")
}
}
}
return false
}
func (ctx *context) receive() (eventsMessage, bool) {
var msg eventsMessage
select {
case msg = <-ctx.retries: // receive message from other failed worker
debugf("events from retries queue")
return msg, true
default:
break
}
select {
case <-ctx.done:
return msg, false
case msg = <-ctx.retries: // receive message from other failed worker
debugf("events from retries queue")
case msg = <-ctx.work: // receive message from publisher
debugf("events from worker worker queue")
}
return msg, true
}
// dropping is called when a message is dropped. It updates the
// relevant counters and sends a failed signal.
func dropping(msg eventsMessage) {
debugf("messages dropped")
mode.Dropped(1)
op.SigFailed(msg.signaler, nil)
}