This repository has been archived by the owner on Jan 31, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
amqp.rb
94 lines (79 loc) · 2.8 KB
/
amqp.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class Service::AMQP < Service
string :server, :port, :vhost, :exchange, :username
password :password
white_list :server, :port, :vhost, :exchange, :username
def receive_push
# Support for specifying as host or server
data['host'] ||= data['server']
if !data['host']
raise_config_error "Invalid server host."
end
if !data['exchange']
raise_config_error "Invalid exchange."
end
# Modify the commits a bit
payload['commits'].each do |commit|
commit['files'] = {
'added' => commit['added'],
'modified' => commit['modified'],
'removed' => commit['removed'],
}
commit.delete('added')
commit.delete('modified')
commit.delete('removed')
end
# Generate the push routing key
owner = payload['repository']['owner']['name']
repo = payload['repository']['name']
ref = ref_name
routing_key = "github.push.#{owner}.#{repo}.#{ref}"
# Assemble the push message
msg = {}
msg['_meta'] = {
'routing_key' => routing_key,
'exchange' => data['exchange'],
}
msg['payload'] = payload
# Publish the push message to the exchange
amqp_exchange.publish(msg.to_json,
:key => routing_key,
:content_type => 'application/json')
# Publish individual commit messages
payload['commits'].each do |commit|
# Generate the commit routing key
author = commit['author']['email']
routing_key = "github.commit.#{owner}.#{repo}.#{ref}.#{author}"
# Assemble the commit message
msg = {}
msg['_meta'] = {
'routing_key' => routing_key,
'exchange' => data['exchange'],
}
msg['payload'] = commit
# Publish the commit message to the exchange
amqp_exchange.publish(msg.to_json,
:key => routing_key,
:content_type => 'application/json')
end
amqp_connection.close
end
attr_writer :amqp_connection
attr_writer :amqp_exchange
def amqp_exchange
@amqp_exchange ||= MQ::Exchange.new(amqp_channel,
:topic,
data['exchange'],
:durable => true)
end
def amqp_channel
@amqp_channel ||= MQ.new(amqp_connection)
end
def amqp_connection
@amqp_connection ||= ::AMQP.connect(:host => data['host'],
:port => data['port'] || 5672,
:user => data['username'] || 'guest',
:pass => data['password'] || 'guest',
:vhost => data['vhost'] || '/',
:logging => false)
end
end