Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/gdb/bunny
Browse files Browse the repository at this point in the history
Conflicts:
	lib/bunny/session.rb
  • Loading branch information
Michael Klishin committed Jan 21, 2013
2 parents b33dc28 + bde27ec commit 1429b53
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 24 deletions.
40 changes: 22 additions & 18 deletions lib/bunny/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@next_publish_seq_no = 0
end

def read_write_timeout
@connection.read_write_timeout
end

# Opens the channel and resets its internal state
# @return [Bunny::Channel] Self
# @api public
Expand Down Expand Up @@ -574,7 +578,7 @@ def basic_qos(prefetch_count, global = false)

@connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_qos_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -593,7 +597,7 @@ def basic_recover(requeue)
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_recover_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -792,7 +796,7 @@ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, e
add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
end

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
# covers server-generated consumer tags
Expand Down Expand Up @@ -828,7 +832,7 @@ def basic_consume_with(consumer)
register_consumer(consumer.consumer_tag, consumer)
end

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
# covers server-generated consumer tags
Expand All @@ -850,7 +854,7 @@ def basic_consume_with(consumer)
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_cancel_ok = wait_on_continuations
end

Expand Down Expand Up @@ -916,7 +920,7 @@ def queue_delete(name, opts = {})
opts[:if_unused],
opts[:if_empty],
false))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -936,7 +940,7 @@ def queue_purge(name, opts = {})

@connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_purge_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -972,7 +976,7 @@ def queue_bind(name, exchange, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_bind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1007,7 +1011,7 @@ def queue_unbind(name, exchange, opts = {})
exchange_name,
opts[:routing_key],
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_unbind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1046,7 +1050,7 @@ def exchange_declare(name, type, opts = {})
false,
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_declare_ok = wait_on_continuations
end

Expand All @@ -1071,7 +1075,7 @@ def exchange_delete(name, opts = {})
name,
opts[:if_unused],
false))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_delete_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1115,7 +1119,7 @@ def exchange_bind(source, destination, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_bind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1159,7 +1163,7 @@ def exchange_unbind(source, destination, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_unbind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1187,7 +1191,7 @@ def channel_flow(active)
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_channel_flow_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1208,7 +1212,7 @@ def tx_select
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_tx_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1223,7 +1227,7 @@ def tx_commit
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_tx_commit_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1238,7 +1242,7 @@ def tx_rollback
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_tx_rollback_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -1278,7 +1282,7 @@ def confirm_select(callback=nil)
@confirms_callback = callback

@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_confirm_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down
8 changes: 6 additions & 2 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def start
@default_channel = self.create_channel
end

def read_write_timeout
@transport.read_write_timeout
end

# Opens a new channel and returns it. This method will block the calling
# thread until the response is received and the channel is guaranteed to be
# opened (this operation is very fast and inexpensive).
Expand All @@ -177,7 +181,7 @@ def close
if @transport.open?
close_all_channels

Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) do
Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
self.close_connection(false)
end
end
Expand Down Expand Up @@ -272,7 +276,7 @@ def close_channel(ch)
# @private
def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) { ch.close }
Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
end

Expand Down
6 changes: 5 additions & 1 deletion lib/bunny/system_timer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ module Bunny
# Used for Ruby before 1.9
class SystemTimer
def self.timeout(seconds, exception)
::SystemTimer.timeout_after(seconds) do
if seconds
::SystemTimer.timeout_after(seconds, exception) do
yield
end
else
yield
end
end
Expand Down
9 changes: 6 additions & 3 deletions lib/bunny/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Transport
DEFAULT_CONNECTION_TIMEOUT = 5.0


attr_reader :session, :host, :port, :socket, :connect_timeout
attr_reader :session, :host, :port, :socket, :connect_timeout, :read_write_timeout, :disconnect_timeout

def initialize(session, host, port, opts)
@session = session
Expand All @@ -31,8 +31,9 @@ def initialize(session, host, port, opts)

@read_write_timeout = opts[:socket_timeout] || 1
@read_write_timeout = nil if @read_write_timeout == 0
@disconnect_timeout = @read_write_timeout || @connect_timeout
@connect_timeout = self.timeout_from(opts)
@connect_timeout = nil if @connect_timeout == 0
@disconnect_timeout = @read_write_timeout || @connect_timeout

@frames = Hash.new { Array.new }

Expand Down Expand Up @@ -132,7 +133,9 @@ def send_frame(frame)
if closed?
@session.handle_network_failure(ConnectionClosedError.new(frame))
else
send_raw(frame.encode)
frame.encode_to_array.each do |component|
send_raw(component)
end
end
end

Expand Down

0 comments on commit 1429b53

Please sign in to comment.