Navigation Menu

Skip to content

Commit

Permalink
in: use server plugin helper
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Apr 24, 2017
1 parent 26fba85 commit e88e0c4
Showing 1 changed file with 76 additions and 88 deletions.
164 changes: 76 additions & 88 deletions lib/fluent/plugin/in_groonga.rb
Expand Up @@ -22,14 +22,15 @@
require "gqtp"
require "groonga/command/parser"

require "fluent/input"
require "fluent/process"
require "fluent/plugin/input"

module Fluent
module Plugin
class GroongaInput < Input
Plugin.register_input("groonga", self)

helpers :server

def initialize
super
end
Expand All @@ -49,12 +50,43 @@ def configure(conf)

def start
super
@input.start

port = @input.port
bind = @input.bind
log.info("[input][groonga][connect] listening port",
:port => port, :bind => bind)
server_create_connection(:groonga_input,
port,
:proto => :tcp,
:bind => bind) do |connection|
handler = nil
real_host = @input.real_host
real_port = @input.real_port
repeater = Coolio::TCPSocket.connect(real_host, real_port)
repeater.on_connect_failed do
log.error("[input][groonga][connect][error] " +
"failed to connect to Groonga:",
:real_host => real_host,
:real_port => real_port)
connection.close
end
repeater.on_read do |data|
handler.write_back(data)
end
repeater.on_close do
handler.close
end
event_loop_attach(repeater)

handler = @input.create_handler(connection, repeater)
connection.data do |data|
handler.on_read(data)
end
end
end

def shutdown
super
@input.shutdown
end

class Repeater < Coolio::TCPSocket
Expand All @@ -74,7 +106,6 @@ def on_close

class BaseInput
include Configurable
include DetachMultiProcessMixin

config_param :bind, :string, :default => "0.0.0.0"
config_param :port, :integer, :default => nil
Expand Down Expand Up @@ -131,37 +162,6 @@ def configure(conf)
@real_port ||= default_port
end

def start
listen_socket = TCPServer.new(@bind, @port)
# detach_multi_process do
@loop = Coolio::Loop.new

@socket = Coolio::TCPServer.new(listen_socket, nil,
handler_class, self)
@loop.attach(@socket)

@shutdown_notifier = Coolio::AsyncWatcher.new
@loop.attach(@shutdown_notifier)

@thread = Thread.new do
run
end
# end
end

def shutdown
@loop.stop
@socket.close
@shutdown_notifier.signal
@thread.join
end

def create_repeater(client)
repeater = Repeater.connect(@real_host, @real_port, client)
repeater.attach(@loop)
repeater
end

def emit(command, params)
normalized_command = command.split(".")[0]
return unless emit_command?(normalized_command)
Expand All @@ -175,14 +175,6 @@ def log
end

private
def run
@loop.run
rescue
log.error("[input][groonga][error] unexpected error",
:error => "#{$!.class}: #{$!}")
log.error_backtrace
end

def emit_command?(command)
return true if @emit_commands.empty?
@emit_commands.any? do |pattern|
Expand All @@ -192,30 +184,20 @@ def emit_command?(command)
end

class HTTPInput < BaseInput
def create_handler(connection, repeater)
Handler.new(self, connection, repeater)
end

private
def default_port
10041
end

def handler_class
Handler
end

class Handler < Coolio::Socket
def initialize(socket, input)
super(socket)
class Handler
def initialize(input, connection, repeater)
@input = input
end

def on_connect
@repeater = @input.create_repeater(self)
@repeater.on_connect_failed do
@input.log.error("[input][groonga][connect][error] " +
"failed to connect to Groonga:",
:real_host => @input.real_host,
:real_port => @input.real_port)
close
end
@connection = connection
@repeater = repeater
@request_handler = RequestHandler.new(@input, @repeater)
@response_handler = ResponseHandler.new(self, @input)
end
Expand Down Expand Up @@ -249,19 +231,23 @@ def write_back(data)
reply_error_response("500 Internal Server Error")
return
end
write(data)
@connection.write(data)
end

def on_response_complete(response)
if need_emit?(response)
@input.emit(@request_handler.command,
@request_handler.params)
end
on_write_complete do
@connection.on(:write_complete) do
@repeater.close
end
end

def close
@connection.close
end

private
def need_emit?(response)
case @request_handler.command
Expand All @@ -279,13 +265,12 @@ def need_emit?(response)
end

def reply_error_response(status)
write("HTTP1.1 #{status}\r\n")
write("Server: fluent-plugin-groonga\r\n")
write("Connection: close\r\n")
write("Content-Length: 0\r\n")
write("\r\n")
disable
on_write_complete do
@connection.write("HTTP1.1 #{status}\r\n")
@connection.write("Server: fluent-plugin-groonga\r\n")
@connection.write("Connection: close\r\n")
@connection.write("Content-Length: 0\r\n")
@connection.write("\r\n")
@connection.on(:write_complete) do
@repeater.close
end
end
Expand Down Expand Up @@ -410,37 +395,40 @@ def on_message_complete
end

class GQTPInput < BaseInput
def create_handler(connection, repeater)
Handler.new(self, connection, repeater)
end

private
def default_port
10043
end

def handler_class
Handler
end

class Handler < Coolio::Socket
def initialize(socket, input)
super(socket)
class Handler
def initialize(input, connection, repeater)
@input = input
end
@connection = connection
@repeater = repeater

def on_connect
@parser = Parser.new(@input)
@repeater = @input.create_repeater(self)
@request_parser = RequestParser.new(@input)
end

def on_read(data)
@parser << data
@request_parser << data
@repeater.write(data)
end

def on_close
@parser.close
def write_back(data)
@connection.write(data)
end

def close
@request_parser.close
@connection.close
end
end

class Parser < GQTP::Parser
class RequestParser < GQTP::Parser
def initialize(input)
super()
@input = input
Expand All @@ -463,13 +451,13 @@ def close
def initialize_command_parser
@command_parser = Groonga::Command::Parser.new
@command_parser.on_command do |command|
@input.emit(command.name, command.arguments)
@input.emit(command.command_name, command.arguments)
end
@command_parser.on_load_value do |command, value|
arguments = command.arguments.dup
arguments[:columns] = command.columns.join(", ")
arguments[:values] = Yajl::Encoder.encode([value])
@input.emit(command.name, arguments)
@input.emit(command.command_name, arguments)
end
end
end
Expand Down

0 comments on commit e88e0c4

Please sign in to comment.