forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer_cancel.go
85 lines (71 loc) · 1.99 KB
/
producer_cancel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package queuetest
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)
// TestSingleProducerConsumer tests buffered events for a producer getting
// cancelled will not be consumed anymore. Concurrent producer/consumer pairs
// might still have active events not yet ACKed (not tested here).
//
// Note: queues not requiring consumers to ACK a events in order to
// return ACKs to the producer are not supported by this test.
func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) {
fn := withLogOutput(func(t *testing.T) {
var (
i int
N1 = 3
N2 = 10
)
log := NewTestLogger(t)
b := factory()
defer b.Close()
log.Debug("create first producer")
producer := b.Producer(queue.ProducerConfig{
ACK: func(int) {}, // install function pointer, so 'cancel' will remove events
DropOnCancel: true,
})
for ; i < N1; i++ {
log.Debugf("send event %v to first producer", i)
producer.Publish(makeEvent(common.MapStr{
"value": i,
}))
}
// cancel producer
log.Debugf("cancel producer")
producer.Cancel()
// reconnect and send some more events
log.Debug("connect new producer")
producer = b.Producer(queue.ProducerConfig{})
for ; i < N2; i++ {
log.Debugf("send event %v to new producer", i)
producer.Publish(makeEvent(common.MapStr{
"value": i,
}))
}
// consumer all events
consumer := b.Consumer()
total := N2 - N1
events := make([]publisher.Event, 0, total)
for len(events) < total {
batch, err := consumer.Get(-1) // collect all events
if err != nil {
panic(err)
}
events = append(events, batch.Events()...)
batch.ACK()
}
// verify
if total != len(events) {
assert.Equal(t, total, len(events))
return
}
for i, event := range events {
value := event.Content.Fields["value"].(int)
assert.Equal(t, i+N1, value)
}
})
fn(t)
}