Navigation Menu

Skip to content

Commit

Permalink
Support max_messages option for sibscription with Cool.io
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 14, 2015
1 parent d07a8e8 commit ab350f3
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion lib/droonga/client/connection/droonga-protocol/coolio.rb
Expand Up @@ -102,13 +102,16 @@ def on_connect_failed
end

class Receiver < ::Coolio::TCPServer
attr_accessor :max_messages

def initialize(*args)
super(*args) do |engine|
@engines << engine
handle_engine(engine)
end
@requests = {}
@engines = []
@max_messages = nil
end

def close
Expand Down Expand Up @@ -153,14 +156,21 @@ def received?(id)
private
def handle_engine(engine)
unpacker = MessagePack::Unpacker.new
n_messages = 0
on_read = lambda do |data|
unpacker.feed_each(data) do |fluent_message|
tag, time, droonga_message = fluent_message
id = droonga_message["inReplyTo"]
request = @requests[id]
next if request.nil?
n_messages += 1
if request
request[:received] = true
request[:callback].call(droonga_message)
end
if @max_messages and
n_messages >= @max_messages
unregister(id)
end
end
end
engine.on_read do |data|
Expand Down Expand Up @@ -231,6 +241,7 @@ def subscribe(message, options={}, &block)
request_options = {
:subscription_timeout => options[:subscription_timeout],
}
@receiver.max_messages = options[:max_messages]
request = InfiniteRequest.new(@loop, request_options)
request.on_timeout = lambda do
@receiver.unregister(id)
Expand Down

0 comments on commit ab350f3

Please sign in to comment.