-
Notifications
You must be signed in to change notification settings - Fork 16
/
queue_publisher.rb
95 lines (79 loc) · 2.62 KB
/
queue_publisher.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class QueuePublisher
def initialize(options = {})
@noop = options[:noop]
return if @noop
@exchange_name = options.fetch(:exchange)
@options = options.except(:exchange)
@connection_mutex = Mutex.new
end
def connection
@connection_mutex.synchronize do
@connection ||= Bunny.new(ENV["RABBITMQ_URL"], @options)
@connection.start
end
end
class PublishFailedError < StandardError
end
def send_message(edition, event_type: nil, routing_key: nil, persistent: true)
return if @noop
validate_edition(edition)
routing_key ||= routing_key(edition, event_type)
publish_message(routing_key, edition, content_type: "application/json", persistent: persistent)
end
def routing_key(edition, event_type)
normalised = edition.symbolize_keys
event_type ||= normalised[:update_type]
"#{normalised[:schema_name]}.#{event_type}"
end
def send_heartbeat
return if @noop
body = {
timestamp: Time.now.utc.iso8601,
hostname: Socket.gethostname,
}
publish_message("heartbeat.major", body, content_type: "application/x-heartbeat", persistent: false)
end
private
def validate_edition(edition)
validator = SchemaValidator.new(payload: edition, schema_type: :notification)
unless validator.valid?
Rails.logger.debug(
{
"message": "Message being sent to the queue does not match the notification schema",
"error": validator.errors.to_s,
"edition": edition,
}.to_json,
)
end
end
def publish_message(routing_key, message_data, options = {})
# we should only have one channel per thread
channel = connection.create_channel
# Enable publisher confirms, so we get acks back after publishes
channel.confirm_select
# passive parameter ensures we don't create the exchange
exchange = channel.topic(@exchange_name, passive: true)
begin
publish_options = options.merge(routing_key: routing_key)
exchange.publish(message_data.to_json, publish_options)
success = exchange.wait_for_confirms
event_type = routing_key.split(".").last
if success
PublishingAPI.service(:statsd).increment("message-sent.#{event_type}")
else
GovukError.notify(
PublishFailedError.new("Publishing message failed"),
level: "error",
extra: {
routing_key: routing_key,
message_body: message_data,
options: options,
},
)
PublishingAPI.service(:statsd).increment("message-send-failure.#{event_type}")
end
ensure
channel.close if channel.open?
end
end
end