Navigation Menu

Skip to content

Commit

Permalink
Separate subscription timeout and receiver timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 14, 2015
1 parent f3b471c commit e6d177b
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
8 changes: 4 additions & 4 deletions lib/droonga/client/connection/droonga-protocol/coolio.rb
Expand Up @@ -41,12 +41,12 @@ class InfiniteRequest

def initialize(loop, options={})
@loop = loop
@timeout_seconds = options[:timeout_seconds]
@subscription_timeout = options[:subscription_timeout]
end

def wait
if @timeout_seconds
@timer = Coolio::TimerWatcher.new(@timeout_seconds)
if @subscription_timeout
@timer = Coolio::TimerWatcher.new(@subscription_timeout)
@timer.on_timer do
@timer.detach
@on_timeout.call if @on_timeout
Expand Down Expand Up @@ -229,7 +229,7 @@ def subscribe(message, options={}, &block)

id = message["id"]
request_options = {
:timeout_seconds => options[:timeout_seconds],
:subscription_timeout => options[:subscription_timeout],
}
request = InfiniteRequest.new(@loop, request_options)
request.on_timeout = lambda do
Expand Down
15 changes: 12 additions & 3 deletions lib/droonga/client/connection/droonga-protocol/thread.rb
Expand Up @@ -78,9 +78,10 @@ def subscribe(message, options={}, &block)
message["from"] = receive_end_point
send(message, options)

timeout_seconds = options[:timeout_seconds]
subscription_timeout = options[:subscription_timeout]
start = Time.now
receive_options = {
:timeout => timeout_seconds,
:timeout => options[:timeout],
}
sync = block.nil?
if sync
Expand All @@ -89,6 +90,10 @@ def subscribe(message, options={}, &block)
receiver.receive(receive_options) do |object|
yielder << object
end
if subscription_timeout
elapsed_seconds = Time.now - start
break if elapsed_seconds >= subscription_timeout
end
end
receiver.close
end
Expand All @@ -97,6 +102,10 @@ def subscribe(message, options={}, &block)
begin
loop do
receiver.receive(receive_options, &block)
if subscription_timeout
elapsed_seconds = Time.now - start
break if elapsed_seconds >= subscription_timeout
end
end
ensure
receiver.close
Expand Down Expand Up @@ -164,7 +173,7 @@ def receive(options={}, &block)
timeout = options[:timeout]
catch do |tag|
loop do
start = Time.new
start = Time.now
readable_ios, = IO.select(@read_ios, nil, nil, timeout)
break if readable_ios.nil?
if timeout
Expand Down

0 comments on commit e6d177b

Please sign in to comment.