forked from famoseagle/carrot
/
server.rb
216 lines (174 loc) · 5.51 KB
/
server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
require 'socket'
require 'thread'
require 'timeout'
begin
require 'openssl'
rescue LoadError
warn "Unable to load SSL extension - you will be unable to make ssl connections."
end
module Carrot::AMQP
class Server
CONNECT_TIMEOUT = 1.0
RETRY_DELAY = 10.0
DEFAULT_PORT = 5672
attr_reader :host, :port, :status
attr_accessor :channel, :ticket
class ServerDown < StandardError; end
class ProtocolError < StandardError; end
def initialize(opts = {})
@host = opts[:host] || 'localhost'
@port = opts[:port] || DEFAULT_PORT
@user = opts[:user] || 'guest'
@pass = opts[:pass] || 'guest'
@vhost = opts[:vhost] || '/'
@insist = opts[:insist]
@status = 'NOT CONNECTED'
@use_ssl = !!opts[:ssl]
@ssl_verify = opts[:ssl_verify] || OpenSSL::SSL::VERIFY_PEER
@multithread = opts[:multithread]
start_session
end
def send_frame(*args)
args.each do |data|
data.ticket = ticket if ticket and data.respond_to?(:ticket=)
data = data.to_frame(channel) unless data.is_a?(Frame)
data.channel = channel
log :send, data
write(data.to_s)
end
nil
end
def next_frame
frame = Frame.parse(buffer)
log :received, frame
frame
end
def next_method
next_payload
end
def next_payload
frame = next_frame
frame and frame.payload
end
def close
send_frame(
Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0)
)
puts "Error closing channel #{channel}" unless next_method.is_a?(Protocol::Channel::CloseOk)
self.channel = 0
send_frame(
Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)
)
puts "Error closing connection" unless next_method.is_a?(Protocol::Connection::CloseOk)
rescue ServerDown => e
ensure
close_socket
end
def read(*args)
send_command(:read, *args)
end
def write(*args)
send_command(:write, *args)
end
private
def buffer
@buffer ||= Buffer.new(self)
end
def send_command(cmd, *args)
begin
socket.__send__(cmd, *args)
rescue Errno::EPIPE, IOError, Errno::ECONNRESET => e
raise ServerDown, e.message
end
end
def start_ssl
unless defined?(OpenSSL)
raise "SSL Extension not installed"
end
ctx = OpenSSL::SSL::SSLContext.new
ctx.verify_mode = @ssl_verify
@socket = OpenSSL::SSL::SSLSocket.new(@socket, ctx)
@socket.sync_close = true
@socket.connect
if ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
@socket.post_connection_check(@address)
end
end
def socket
return @socket if @socket and not @socket.closed?
begin
# Attempt to connect.
mutex.lock if multithread?
@socket = timeout(CONNECT_TIMEOUT) do
TCPSocket.new(host, port)
end
if Socket.constants.include? 'TCP_NODELAY'
@socket.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
end
start_ssl if @use_ssl
@status = 'CONNECTED'
rescue SocketError, SystemCallError, IOError, Timeout::Error => e
msg = e.message << " - #{@host}:#{@port}"
raise ServerDown, e.message
ensure
mutex.unlock if multithread?
end
@socket
end
def start_session
@channel = 0
write(HEADER)
write([1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4'))
raise ProtocolError, 'bad start connection' unless next_method.is_a?(Protocol::Connection::Start)
send_frame(
Protocol::Connection::StartOk.new(
{:platform => 'Ruby', :product => 'Carrot', :information => 'http://github.com/famosagle/carrot', :version => VERSION},
'AMQPLAIN',
{:LOGIN => @user, :PASSWORD => @pass},
'en_US'
)
)
method = next_method
raise ProtocolError, "Bad AMQP Credentials. user: #{@user}, pass: #{@pass}" if method.nil?
if method.is_a?(Protocol::Connection::Tune)
send_frame(
Protocol::Connection::TuneOk.new( :channel_max => 0, :frame_max => 131072, :heartbeat => 0)
)
end
send_frame(
Protocol::Connection::Open.new(:virtual_host => @vhost, :capabilities => '', :insist => @insist)
)
raise ProtocolError, 'bad open connection' unless next_method.is_a?(Protocol::Connection::OpenOk)
@channel = 1
send_frame(Protocol::Channel::Open.new)
raise ProtocolError, "cannot open channel #{channel}" unless next_method.is_a?(Protocol::Channel::OpenOk)
send_frame(
Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true, :passive => true)
)
method = next_method
raise ProtocolError, 'access denied' unless method.is_a?(Protocol::Access::RequestOk)
self.ticket = method.ticket
end
def multithread?
@multithread
end
def close_socket(reason=nil)
# Close the socket. The server is not considered dead.
mutex.lock if multithread?
@socket.close if @socket and not @socket.closed?
@socket = nil
@status = "NOT CONNECTED"
ensure
mutex.unlock if multithread?
end
def mutex
@mutex ||= Mutex.new
end
def log(*args)
return unless Carrot.logging?
require 'pp'
pp args
puts
end
end
end