-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathexponential_backoff_handler.rb
122 lines (93 loc) · 3.63 KB
/
exponential_backoff_handler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
module SneakersHandlers
class ExponentialBackoffHandler
attr_reader :queue, :channel, :options, :delays
def initialize(channel, queue, options)
@queue = queue
@channel = channel
@options = options
@delays = options[:delays] || [1.second, 10.seconds, 1.minute, 10.minutes]
create_error_exchange!
Array(@options[:routing_key]).each do |key|
queue.bind(primary_exchange, routing_key: queue.name + "." + key + ".*")
end
end
def acknowledge(delivery_info, _, _)
@channel.acknowledge(delivery_info.delivery_tag, false)
end
def reject(delivery_info, properties, message)
retry_message(delivery_info, properties, message, :reject)
end
def error(delivery_info, properties, message, err)
retry_message(delivery_info, properties, message, err)
end
def timeout(delivery_info, properties, message)
retry_message(delivery_info, properties, message, :timeout)
end
def noop(delivery_info, properties, message); end
private
def retry_message(delivery_info, properties, message, reason)
attempt_number = death_count(properties[:headers])
routing_key_segments = (queue.name + "." + delivery_info[:routing_key].gsub(queue.name + ".", "")).split(".")
routing_key_segments.pop if Integer(routing_key_segments.last) rescue nil
if attempt_number < @delays.length
delay = @delays[attempt_number]
log("msg=retrying, delay=#{delay}, count=#{attempt_number}, properties=#{properties}")
routing_key_segments << delay
routing_key = routing_key_segments.join(".")
retry_queue = create_retry_queue!(delay)
retry_queue.bind(retry_exchange, routing_key: routing_key)
retry_exchange.publish(message, routing_key: routing_key, headers: properties[:headers])
else
log("msg=erroring, count=#{attempt_number}, properties=#{properties}")
channel.reject(delivery_info.delivery_tag)
end
acknowledge(delivery_info, properties, message)
end
def death_count(headers)
return 0 if headers.nil? || headers["x-death"].nil?
headers["x-death"].inject(0) do |sum, x_death|
sum + x_death["count"] if x_death["queue"] =~ /^#{queue.name}/
end
end
def log(message)
Sneakers.logger.debug do
"DelayedRetryHandler handler [queue=#{@primary_queue_name}] #{message}"
end
end
def durable_exchanges?
options[:exchange_options][:durable]
end
def durable_queues?
options[:queue_options][:durable]
end
def create_exchange(name)
log("creating exchange=#{name}")
@channel.exchange(name, type: "topic", durable: durable_exchanges?)
end
def retry_exchange
@retry_exchange ||= create_exchange("#{options[:exchange]}.retry")
end
def primary_exchange
@primary_exchange ||= create_exchange("#{options[:exchange]}")
end
def create_error_exchange!
@error_exchange ||= create_exchange("#{options[:exchange]}.error").tap do |exchange|
queue = @channel.queue("#{@queue.name}.error", durable: durable_queues?)
Array(@options[:routing_key]).each do |key|
queue.bind(exchange, routing_key: @queue.name + "." + key)
queue.bind(exchange, routing_key: @queue.name + "." + key + ".*")
end
end
end
def create_retry_queue!(delay)
@channel.queue("#{queue.name}.retry.#{delay}",
durable: durable_queues?,
arguments: {
:"x-dead-letter-exchange" => options[:exchange],
:"x-message-ttl" => delay * 1_000,
:"x-expires" => delay * 1_000 * 2
}
)
end
end
end