Navigation Menu

Skip to content

Commit

Permalink
Implement Cool.io backend for Droonga protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Apr 10, 2014
1 parent 9294bbb commit 40a0201
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 0 deletions.
1 change: 1 addition & 0 deletions droonga-client.gemspec
Expand Up @@ -39,6 +39,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "fluent-logger"
spec.add_runtime_dependency "rack"
spec.add_runtime_dependency "yajl-ruby"
spec.add_runtime_dependency "droonga-message-pack-packer"

spec.add_development_dependency "bundler", "~> 1.3"
spec.add_development_dependency "rake"
Expand Down
232 changes: 232 additions & 0 deletions lib/droonga/client/connection/droonga-protocol/coolio.rb
@@ -0,0 +1,232 @@
# Copyright (C) 2014 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "coolio"
require "droonga/message-pack-packer"

module Droonga
class Client
module Connection
class DroongaProtocol
class Coolio
class Request
def initialize(receiver, id, loop)
@receiver = receiver
@id = id
@loop = loop
end

def wait
return if @receiver.received?(@id)
until @receiver.received?(@id)
@loop.run_once
end
end
end

class InfiniteRequest
def initialize(loop)
@loop = loop
end

def wait
@loop.run
end
end

class Sender < ::Coolio::TCPSocket
def initialize(socket)
super(socket)
end

def send(tag, data)
fluent_message = [tag, Time.now.to_i, data]
packed_fluent_message = MessagePackPacker.pack(fluent_message)
write(packed_fluent_message)
end
end

class Receiver < ::Coolio::TCPServer
def initialize(*args)
super(*args) do |engine|
@engines << engine
handle_engine(engine)
end
@requests = {}
@engines = []
end

def close
super
@engines.each do |engine|
engine.close
end
@engines.clear
end

def host
@listen_socket.addr[3]
end

def port
@listen_socket.addr[1]
end

def droonga_name
"#{host}:#{port}/droonga"
end

def register(id, &callback)
@requests[id] = {
:received => false,
:callback => callback,
}
end

def unregister(id)
@requests.delete(id)
end

def received?(id)
if @requests.key?(id)
@requests[id][:received]
else
true
end
end

private
def handle_engine(engine)
unpacker = MessagePack::Unpacker.new
on_read = lambda do |data|
unpacker.feed_each(data) do |fluent_message|
tag, time, droonga_message = fluent_message
id = droonga_message["inReplyTo"]
request = @requests[id]
next if request.nil?
request[:received] = true
request[:callback].call(fluent_message)
end
end
engine.on_read do |data|
on_read.call(data)
end

on_close = lambda do
@engines.delete(engine)
end
engine.on_close do
on_close.call
end
end
end

def initialize(host, port, tag, options={})
@host = host
@port = port
@tag = tag
default_options = {
}
@options = default_options.merge(options)
@loop = options[:loop] || ::Coolio::Loop.default

@sender = Sender.connect(@host, @port)
@sender.attach(@loop)
@receiver_host = @options[:receiver_host] || Socket.gethostname
@receiver_port = @options[:receiver_port] || 0
@receiver = Receiver.new(@receiver_host, @receiver_port)
@receiver.attach(@loop)
end

def request(message, options={}, &block)
id = message["id"] || generate_id
message = message.merge("id" => id,
"replyTo" => @receiver.droonga_name)
send(message, options)

sync = block.nil?
if sync
response = nil
block = lambda do |_response|
response = _response
end
end
@receiver.register(id) do
@receiver.unregister(id)
block.call
end
request = Request.new(@receiver, id, @loop)
if sync
request.wait
response
else
request
end
end

def subscribe(message, options={}, &block)
id = message["id"] || generate_id
message = message.merge("id" => id,
"from" => @receiver.droonga_name)
send(message, options)

request = InfiniteRequest.new(@loop)
sync = block.nil?
if sync
yielder = nil
buffer = []
@receiver.register(id) do |response|
if yielder
while (old_response = buffer.shift)
yielder << old_response
end
yielder << response
else
buffer << response
end
end
Enumerator.new do |_yielder|
yielder = _yielder
request.wait
end
else
@receiver.register(id, &block)
request
end
end

def send(message, options={}, &block)
if message["id"].nil? or message["date"].nil?
id = message["id"] || generate_id
date = message["date"] || Time.now
message = message.merge("id" => id, "date" => date)
end
@sender.send("#{@tag}.message", message)
end

def close
@sender.close
@receiver.close
end

private
def generate_id
Time.now.to_f.to_s
end
end
end
end
end
end

0 comments on commit 40a0201

Please sign in to comment.