-
Notifications
You must be signed in to change notification settings - Fork 339
/
Copy pathidempotent_async_producer_spec.rb
126 lines (102 loc) · 4.25 KB
/
idempotent_async_producer_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# frozen_string_literal: true
describe "Idempotent async producer", functional: true do
let(:producer) { kafka.async_producer(max_retries: 3, delivery_interval: 1, retry_backoff: 1, idempotent: true) }
after do
producer.shutdown
end
example 'duplication if idempotent is not enabled' do
producer = kafka.async_producer(max_retries: 3, delivery_interval: 1, retry_backoff: 1, idempotent: false)
topic = create_random_topic(num_partitions: 3)
producer.produce('Hello', topic: topic, partition: 0)
sleep 2
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:read).and_raise(Errno::ETIMEDOUT)
producer.produce('Hi', topic: topic, partition: 0)
sleep 2
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:read).and_call_original
sleep 2
records = kafka.fetch_messages(topic: topic, partition: 0, offset: :earliest)
expect(records.length).to eql(3)
expect(records[0].value).to eql('Hello')
expect(records[1].value).to eql('Hi')
expect(records[2].value).to eql('Hi')
end
example "produces records in normal situation" do
topic = create_random_topic(num_partitions: 3)
250.times do |index|
producer.produce(index.to_s, topic: topic, partition: 0)
producer.produce(index.to_s, topic: topic, partition: 1)
producer.produce(index.to_s, topic: topic, partition: 2)
end
sleep 2
kafka.fetch_messages(topic: topic, partition: 0, offset: :earliest).each_with_index do |record, index|
expect(record.value).to eql(index.to_s)
end
kafka.fetch_messages(topic: topic, partition: 1, offset: :earliest).each_with_index do |record, index|
expect(record.value).to eql(index.to_s)
end
kafka.fetch_messages(topic: topic, partition: 2, offset: :earliest).each_with_index do |record, index|
expect(record.value).to eql(index.to_s)
end
end
example "no duplications if brokers are down while writting" do
topic = create_random_topic(num_partitions: 3)
producer.produce('Hello', topic: topic, partition: 0)
sleep 2
# Simulate the situation that all brokers fail to write
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:write).and_raise(Errno::ETIMEDOUT)
producer.produce('Hi', topic: topic, partition: 0)
sleep 2
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:write).and_call_original
sleep 2
records = kafka.fetch_messages(topic: topic, partition: 0, offset: :earliest)
expect(records.length).to eql(2)
expect(records[0].value).to eql('Hello')
expect(records[1].value).to eql('Hi')
end
example "no duplication if brokers are down while reading response" do
topic = create_random_topic(num_partitions: 3)
producer.produce('Hello', topic: topic, partition: 0)
sleep 2
# Simulate the situation that all brokers fail to read
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:read).and_raise(Errno::ETIMEDOUT)
producer.produce('Hi', topic: topic, partition: 0)
sleep 2
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:read).and_call_original
sleep 2
records = kafka.fetch_messages(topic: topic, partition: 0, offset: :earliest)
expect(records.length).to eql(2)
expect(records[0].value).to eql('Hello')
expect(records[1].value).to eql('Hi')
end
example "no duplication if one of the brokers are down" do
topic = create_random_topic(num_partitions: 10)
10.times do |index|
producer.produce('Hello', topic: topic, partition: index)
end
sleep 2
# Simulate the situation that only one broker is down
raised = 1
begin
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:read) do
if raised != 0
raised -= 1
else
raise Errno::ETIMEDOUT
end
end
rescue Kafka::DeliveryFailed
end
10.times do |index|
producer.produce('Hi', topic: topic, partition: index)
end
sleep 2
allow_any_instance_of(Kafka::SocketWithTimeout).to receive(:read).and_call_original
sleep 2
10.times do |index|
records = kafka.fetch_messages(topic: topic, partition: index, offset: :earliest)
expect(records.length).to eql(2)
expect(records[0].value).to eql('Hello')
expect(records[1].value).to eql('Hi')
end
end
end