Skip to content

Commit

Permalink
Refactor the distributed client stuff, add basic authentication based…
Browse files Browse the repository at this point in the history
… on a shared token
  • Loading branch information
Sutto committed Sep 15, 2009
1 parent 7455f2f commit 67f003a
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 190 deletions.
5 changes: 3 additions & 2 deletions lib/marvin/abstract_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ def self.configure
config.merge! nash
end
@@configuration = config
# TODO: Remove Duplication
Marvin::CoreCommands.register!
# Help is only currently available on an instance running
# distributed handler.
Marvin::CoreCommands.register! unless Marvin::Distributed::Handler.registered?
@setup = true
end

Expand Down
7 changes: 4 additions & 3 deletions lib/marvin/distributed.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Marvin
module Distributed
autoload :Server, 'marvin/distributed/server'
autoload :Handler, 'marvin/distributed/handler'
autoload :Client, 'marvin/distributed/client'
autoload :Protocol, 'marvin/distributed/protocol'
autoload :Server, 'marvin/distributed/server'
autoload :Handler, 'marvin/distributed/handler'
autoload :Client, 'marvin/distributed/client'
end
end
79 changes: 35 additions & 44 deletions lib/marvin/distributed/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,75 +37,51 @@ def method_missing(name, *args)

end

class EMConnection < EventMachine::Protocols::LineAndTextProtocol
is :loggable
class EMConnection < Marvin::Distributed::Protocol

register_handler_method :event
register_handler_method :authentication_failed
register_handler_method :authenticated
register_handler_method :unauthorized

cattr_accessor :stopping
self.stopping = false

attr_accessor :client, :port, :connection_host, :connection_port
attr_accessor :client, :port, :connection_host, :connection_port, :configuration

def initialize(*args)
config = args.last.is_a?(Marvin::Nash) ? args.pop : Marvin::Nash.new
@configuration = args.last.is_a?(Marvin::Nash) ? args.pop : Marvin::Nash.new
super(*args)
@callbacks = {}
@client = Marvin::Distributed::Client.new(self)
@authenticated = false
end

def post_init
super
logger.info "Connected to distributed server"
if configuration.token?
logger.info "Attempting to authenticate..."
send_message(:authenticate, {:token => configuration.token})
end
end

def unbind
if self.stopping
logger.info "Stopping distributed client"
else
logger.info "Lost connection to distributed client - Scheduling reconnect"
EventMachine.add_timer(15) { EMConnection.connect(connection_host, connection_port) }
EventMachine.add_timer(15) { EMConnection.connect(connection_host, connection_port, @configuration) }
end
super
end

def receive_line(line)
line.strip!
logger.debug "<< #{line}"
response = JSON.parse(line)
handle_response(response)
rescue JSON::ParserError
logger.warn "Error parsing input: #{line}"
rescue Exception => e
logger.warn "Uncaught exception raised; Likely in Marvin"
Marvin::ExceptionTracker.log(e)
end

def send_message(name, arguments = {}, &callback)
logger.debug "Sending #{name.inspect} to #{self.host_with_port}"
payload = {
"message" => name.to_s,
"options" => arguments,
"sent-at" => Time.now
}
payload.merge!(options_for_callback(callback))
send_data "#{JSON.dump(payload)}\n"
end

def handle_response(response)
return unless response.is_a?(Hash) && response.has_key?("message")
options = response["options"] || {}
process_callback(response)
case response["message"]
when "event"
handle_event(options)
end
end

def handle_event(options = {})
event = options["event-name"]
event = options["event-name"]
client_host = options["client-host"]
client_nick = options["client-nick"]
options = options["event-options"]
options = {} unless options.is_a?(Hash)
options = options["event-options"]
options = {} unless options.is_a?(Hash)
return if event.blank?
begin
logger.debug "Handling #{event}"
Expand All @@ -128,9 +104,24 @@ def handle_event(options = {})
end
end

def self.connect(host, port)
def handle_unauthorized(options = {})
logger.warn "Attempted action when unauthorized. Stopping client."
Marvin::Distributed::Client.stop
end

def handle_authenticated(options = {})
@authenticated = true
logger.info "Successfully authenticated with #{host_with_port}"
end

def handle_authentication_failed(options = {})
logger.info "Authentication with #{host_with_port} failed. Stopping."
Marvin::Distributed::Client.stop
end

def self.connect(host, port, config = Marvin::Nash.new)
logger.info "Attempting to connect to #{host}:#{port}"
EventMachine.connect(host, port, self) do |c|
EventMachine.connect(host, port, self, config) do |c|
c.connection_host = host
c.connection_port = port
end
Expand Down Expand Up @@ -198,7 +189,7 @@ def run
opts = opts.client || Marvin::Nash.new
host = opts.host || "0.0.0.0"
port = (opts.port || 8943).to_i
EMConnection.connect(host, port)
EMConnection.connect(host, port, opts)
end
end

Expand Down
78 changes: 0 additions & 78 deletions lib/marvin/distributed/drb_client.rb

This file was deleted.

12 changes: 12 additions & 0 deletions lib/marvin/distributed/handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def dispatch(name, options, client = self.client)
return if client.blank?
server = Marvin::Distributed::Server.next
if server.blank?
logger.debug "Distributed handler is currently busy - adding to queue"
# TODO: Add to queued messages, wait
@message_queue << [name, options, client]
run! unless running?
Expand All @@ -36,7 +37,13 @@ def dispatch(name, options, client = self.client)

def process_queue
count = [@message_queue.size, Server.free_connections.size].min
logger.debug "Processing #{count} item(s) from the message queue"
count.times { |item| dispatch(*@message_queue.shift) }
if @message_queue.empty?
logger.debug "The message queue is now empty"
else
logger.debug "The message queue still has #{count} item(s)"
end
check_queue_progress
end

Expand All @@ -59,6 +66,11 @@ def check_queue_progress

class << self

def whitelist_event(name)
EVENT_WHITELIST << name.to_sym
EVENT_WHITELIST.uniq!
end

def register!(*args)
# DO NOT register if this is not a normal client.
return unless Marvin::Loader.client?
Expand Down
88 changes: 88 additions & 0 deletions lib/marvin/distributed/protocol.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
module Marvin
module Distributed
class Protocol < EventMachine::Protocols::LineAndTextProtocol
is :loggable

class_inheritable_accessor :handler_methods
self.handler_methods = {}

attr_accessor :callbacks

def receive_line(line)
line.strip!
logger.debug "<< #{line}"
response = JSON.parse(line)
handle_response(response)
rescue JSON::ParserError
logger.debug "JSON parsing error for #{line.inspect}"
rescue Exception => e
Marvin::ExceptionTracker.log(e)
end

def send_message(name, arguments = {}, &callback)
logger.debug "Sending #{name.inspect} to #{self.host_with_port}"
payload = {
"message" => name.to_s,
"options" => arguments,
"sent-at" => Time.now
}
payload.merge!(options_for_callback(callback))
payload = JSON.dump(payload)
logger.debug ">> #{payload}"
send_data "#{payload}\n"
end

def handle_response(response)
logger.debug "Handling response in distributed protocol (response => #{response.inspect})"
return unless response.is_a?(Hash) && response.has_key?("message")
options = response["options"] || {}
process_response_message(response["message"], options)
end

def host_with_port
@host_with_port ||= begin
port, ip = Socket.unpack_sockaddr_in(get_peername)
"#{ip}:#{port}"
end
end

protected

def options_for_callback(blk)
return {} if blk.blank?
cb_id = "callback-#{seld.object_id}-#{Time.now.to_f}"
count = 0
count += 1 while @callbacks.has_key?(Digest::SHA256.hexdigest("#{cb_id}-#{count}"))
final_id = Digest::SHA256.hexdigest("#{cb_id}-#{count}")
@callbacks ||= {}
@callbacks[final_id] = blk
{"callback-id" => final_id}
end

def process_callback(hash)
@callbacks ||= {}
if hash.is_a?(Hash) && hash.has_key?("callback-id")
callback = @callbacks.delete(hash["callback-id"])
callback.call(self, hash)
end
end

def process_response_message(message, options)
method = self.handler_methods[message.to_s]
if method.present? && respond_to?(method)
logger.debug "Dispatching #{message} to #{method}"
send(method, options)
else
logger.warn "Got unknown message (#{message}) with options: #{options.inspect}"
end
end

def self.register_handler_method(name, method = nil)
name = name.to_s
method ||= "handle_#{name}".to_sym
self.handler_methods[name] = method
end

end
end
end
Loading

0 comments on commit 67f003a

Please sign in to comment.