Permalink
Browse files

[NOT BACKWARDS COMPATIBLE] Comm using unix sockets

This commit is not backwards compatible!
The tactics are now communicating with the
tactics scripts using unix sockets rather than stdio.
The unix socket is passed to the tactic script as the
first and only argument.

The tactics now use EventMachine connections rather
than threads. This should be cleaner, and hopefully better.

Performance is still abysmal.
  • Loading branch information...
1 parent 268004c commit e0e691d1df77404483b57f0b6de5ed68ef32fea4 @sebastian sebastian committed Dec 26, 2011
@@ -22,8 +22,10 @@ def initialize options, &block
super options
self.run_bg
+
self.register_callback(:needed_truth_scratch) do |d|
@return_value = @callback.call(d)
+ terminate_question
@working = false
end
end
@@ -32,11 +34,17 @@ def answer
# Spin lock until the value is returned
while @working do end
- # terminate the question
- self.stop
-
# Returns what was returned by the
@return_value
end
+
+ private
+ def terminate_question
+ @hero = self
+ EM.add_timer(0.1) do
+ @hero.sync_do {@hero.remove_subscriptions <~ [[@solver, [ip_port]]]}
+ @hero.stop
+ end
+ end
end
end
@@ -75,13 +75,12 @@ class Solver
def initialize node_name = "default", options = {}
@name = node_name
learn_about_tactics
-
+
super options
self.run_bg
end
def resolve what, user_info
- puts "Attempting to resolve '#{what}' with user info #{user_info}"
options = {:what => what, :solver => ip_port, :user_info => user_info}
question = Question.new options do |truths|
puts "QUESTION #{what}"
@@ -92,13 +91,12 @@ def resolve what, user_info
puts "\tprovider: #{who}"
puts "\tuser_info: #{user_info}"
if answer.class == Array then
- puts "\tdata: #{answer.join(", ")}"
+ puts "\tdata: [#{answer.join(", ")}]"
else
- puts "\tdata: #{answer}"
+ puts "\tdata: [#{answer}]"
end
end
end
- # question.answer
end
def shutdown
@@ -111,8 +109,7 @@ def explore_truth_space_for what, user_info
tactic[:provides].each do |thing|
if thing.match(what) then
options = {:what => what}
- puts "Calling tactic.new #{tactic[:name]}"
- Tactic.new tactic[:name], ip_port, @name, user_info, options
+ Tactic.new tactic[:dir_name], ip_port, @name, user_info, options
end
end
end
@@ -3,6 +3,41 @@
require 'open3'
module TacticSolver
+ class TacticCommunicator < EventMachine::Connection
+ def initialize tactic
+ @_tactic = tactic
+ super
+
+ @_tactic.communication_agent = self
+ @_tactic.send_initial_data
+ end
+
+ def stop_communicator
+ close_connection_after_writing
+ end
+
+ def pass_on_data data
+ send_data "#{data.to_json}\n"
+ end
+
+ def receive_data data
+ data.split("\n").each do |d|
+ begin
+ e = JSON.parse(d)
+ @_tactic.deal_with e
+ rescue JSON::ParserError
+ $stderr.puts "Couldn't parse the input"
+ end
+ end
+ end
+
+ def unbind
+ @_tactic.communication_agent = nil
+ @_tactic.stop_tactic
+ end
+
+ end
+
class FailedTactic < Exception
def initialize tactic, description
@tactic = tactic
@@ -40,12 +75,13 @@ def initialize dir_name, solver, node_name, user_info, options = {}
@_user_info = user_info
@_what = options[:what] || nil
@_perform_delayed_execution = @_what ? true : false
+ @_is_running = true
setup_tactic
super options
- self.run_bg
register_callbacks
+ self.run_bg
end
def register_callbacks
@@ -59,16 +95,18 @@ def register_callbacks
end
end
- def shut_down
+ def stop_tactic
+ return unless @_is_running
+
# Remove subscriptions from solver
self.sync_do {remove_subscriptions <~ [[@_solver, [ip_port]]]}
-
+ @_communication_agent.stop_communicator if @_communication_agent
self.stop
- @_io_in.close if @_io_in
- @_io_out.close if @_io_out
- @_io_err.close if @_io_err
- @_thread_out.terminate
- @_thread_err.terminate
+
+ # Remove the unix-socket we created as it is no longer needed
+ FileUtils.rm(socket_name)
+
+ @_is_running = false
end
#---------------------------
@@ -97,22 +135,22 @@ def execute what
end
set_the_magic_variables what
+ start_program
# Find what the tactic requires
needed_parameters = requirements @_requires
# Add requirements
needed_parameters.each {|p| add_requirement p}
+ end
- start_program
-
- # Add all known data into the bloom system to bootstrap the resolution
- # process
- pass_on_truth "what", "initial_value", @_what
- pass_on_truth "port", "initial_value", @_port
- pass_on_truth "destination", "initial_value", @_destination
- pass_on_truth "domain", "initial_value", @_domain
- pass_on_truth "resource", "initial_value", @_resource
- pass_on_truth "user", "initial_value", @_user_info
+ def send_initial_data
+ # Pass standard facts to the tactic
+ pass_on_truths [["what", "initial_value", @_what],
+ ["port", "initial_value", @_port],
+ ["destination", "initial_value", @_destination],
+ ["domain", "initial_value", @_domain],
+ ["resource", "initial_value", @_resource],
+ ["user", "initial_value", @_user_info]]
end
def self.provides dir_name, node_name
@@ -122,7 +160,7 @@ def self.provides dir_name, node_name
config['provides'].each {|something|
provides << Regexp.new("^#{Tactic.deal_with_magic(something, node_name)}")
}
- {:name => name, :provides => provides}
+ {:name => name, :provides => provides, :dir_name => dir_name}
rescue Errno::ENOENT
print_error "Missing configuration file: " \
@@ -155,6 +193,19 @@ def self.print_error name, description
raise FailedTactic.new name, description
end
+ def communication_agent= agent
+ @_communication_agent = agent
+ # Some times we have received data before we are ready
+ # with a communication agent.
+ # If that has been the case, then we need to send on the
+ # data once we have a communication agent.
+ if @_pending_pass_on_data then
+ @_pending_pass_on_data.each do |data|
+ @_communication_agent.pass_on_data data
+ end
+ end
+ end
+
private
def self.deal_with_magic provision, node_name
prov = provision
@@ -190,14 +241,32 @@ def add_truth truth, value, user_info
}
end
+ def pass_on_truths truth_vals
+ truths = []
+ truth_vals.each do |truth, source, value|
+ new_truth = {
+ :what => truth,
+ :source => source,
+ :value => value
+ }
+ truths << new_truth
+ end
+ data = {:truths => truths}
+ # Things at times happen out of order.
+ # If that is the case, and we don't yet have
+ # a communication agent, then we add the data
+ # to the list of pending data items, and deliver
+ # them once we have a communication agent!
+ if @_communication_agent then
+ @_communication_agent.pass_on_data data
+ else
+ @_pending_pass_on_data ||= []
+ @_pending_pass_on_data << data
+ end
+ end
+
def pass_on_truth truth, source, value
- new_truth = {
- :what => truth,
- :source => source,
- :value => value
- }
- data = {:truths => [new_truth]}
- @_io_in.puts data.to_json
+ pass_on_truths [[truth, source, value]]
end
def add_requirement requirement
@@ -252,38 +321,21 @@ def setup_tactic
end
- def start_program
- # Start the program
- @_io_in, @_io_out, @_io_err = Open3.popen3("#{@_tactic_folder}/#{@_executable}")
- @_io_in.sync = true
-
- @_thread_out = Thread.new(@_io_out, self) do |out, tactic|
- while true
- begin
- json_from_program = out.gets
- data = JSON.parse(json_from_program)
-
- tactic.deal_with data
-
- rescue e
- puts "ERROR [#{@_name}]: got malformed response from process."
-
- end
- end
+ def socket_name
+ unless @socket_name then
+ long_name = (0...50).map{ ('a'..'z').to_a[rand(26)] }.join
+ @socket_name = "/tmp/signpost-#{long_name}.sock"
end
+ @socket_name
+ end
- @_thread_err = Thread.new(@_io_err) do |err|
- while true
- begin
- error_text = err.gets
- puts "STDERR [#{@_name}] : #{error_text}" unless error_text.strip.chomp == ""
-
- rescue e
- puts "ERROR [#{@_name}]: reading error in STDERR reading thread for #{@_name}"
-
- end
- end
- end
+ def start_program
+ EventMachine::start_unix_domain_server(socket_name, TacticCommunicator, self)
+ File.chmod(0777, socket_name)
+ full_executable = "#{@_tactic_folder}/#{@_executable} #{socket_name}"
+ IO.popen(full_executable)
+ # tactic_script = fork {exec "#{@_tactic_folder}/#{@_executable} #{socket_name}"}
+ # Process.detach(tactic_script)
end
def check_file_exists *files
Oops, something went wrong.

0 comments on commit e0e691d

Please sign in to comment.