Permalink
Browse files

Added the ability for bi-directional data transmission.

  • Loading branch information...
1 parent 11025ad commit 44bb05f1d0f2db2c9fd9cc8083b833cee702a742 Samuel Tesla committed May 23, 2009
View
@@ -0,0 +1,13 @@
+class Echo < Pelvis::Actor
+ operation '/do/echo'
+ def echo
+ send_data :output => "Ready to receive data:"
+ recv_data do |data|
+ if "QUIT" === data[:input].chomp.upcase
+ finish
+ else
+ send_data :output => "RECEIVED: #{data[:input]}"
+ end
+ end
+ end
+end
View
@@ -128,6 +128,10 @@ def request(*args)
@invocation.request(*args)
end
+ def recv_data(&block)
+ @invocation.on_sent(&block)
+ end
+
def send_data(data)
received(data)
end
View
@@ -67,6 +67,11 @@ def duration
finished? ? @finished_at - @started_at : Time.now - @started_at
end
+ def put(data)
+ logger.debug "evocation put: #{data.inspect}"
+ @incall.put(data)
+ end
+
def inspect
"#<#{self.class} outcall=#{@outcall.inspect} identity=#{@identity.inspect}>"
end
View
@@ -67,5 +67,10 @@ def invocations
def operations
@agent.operations_for(job)
end
+
+ def put(data)
+ logger.debug "incall put: #{data.inspect}"
+ invocations.each {|i| i.put(data) }
+ end
end
end
View
@@ -3,7 +3,7 @@ class Invocation
include Logging
extend Callbacks
- callbacks :received, :completed, :failed
+ callbacks :received, :completed, :failed, :sent
def initialize(incall, actor_klass, operation)
@incall, @actor_klass, @operation = incall, actor_klass, operation
@@ -50,5 +50,10 @@ def finish
def finished?
@finished_at
end
+
+ def put(data)
+ logger.debug "invocation put: #{data.inspect}"
+ sent(data)
+ end
end
end
View
@@ -120,6 +120,13 @@ def evocations
@evocations ||= {}
end
+ def put(data)
+ logger.debug "outcall put: #{data.inspect}"
+ evocations.each do |e, finished|
+ e.put(data) unless finished
+ end
+ end
+
def inspect
"#<#{self.class} agent=#{@agent.inspect} " \
"token=#{@job.token.inspect} operation=#{@job.operation.inspect} " \
@@ -43,6 +43,12 @@ def complete
def fail(error)
failed(error)
end
+
+ def put(data)
+ @agent.send_job_data(job.token, data) do |reply|
+ # TODO: what to do with errors here?
+ end
+ end
end
end
end
@@ -30,6 +30,7 @@ def handle_job_init(stanza, node, token)
job = Job.create(token, node["scope"], node["operation"], args, {})
logger.debug "Job starting: #{job.inspect}"
incall = @protocol.agent.invoke(@identity, job)
+ incalls[job.token] = incall
incall.on_initialized do
send_result(stanza)
end
@@ -61,7 +62,12 @@ def handle_job_begin(stanza, node, token)
def handle_job_data(stanza, node, token)
data = JSON.parse(Base64.decode64(node.content)).to_mash
- incall_for(token).receive(data)
+ incall = incall_for(token)
+ if Pelvis::Incall === incall
+ incall.put(data)
+ else
+ incall.receive(data)
+ end
send_result(stanza)
end
View
@@ -0,0 +1,71 @@
+#!/usr/bin/env ruby
+
+require 'rubygems'
+
+require File.dirname(__FILE__) + '/lib/pelvis'
+
+require File.dirname(__FILE__) + '/examples/actors/herault'
+require File.dirname(__FILE__) + '/examples/actors/echo'
+
+def connect_herault
+ Pelvis.connect(:local, :identity => "herault", :advertise => false) do |agent|
+ agent.add_actor Herault
+ agent.on_advertised do
+ puts "herault is ready"
+ connect_foo
+ end
+ end
+end
+
+def connect_foo
+ Pelvis.connect(:local, :identity => "foo") do |agent|
+ agent.add_actor Echo
+ agent.on_advertised do
+ puts "foo is ready"
+ connect_bar
+ end
+ end
+end
+
+def connect_bar
+ Pelvis.connect(:local, {:identity => "bar"}) do |agent|
+ agent.on_advertised do
+ puts "bar is ready"
+
+ r = agent.request(:all, "/do/echo", {}, :identities => ["foo"])
+ r.on_received do |data|
+ puts data[:output]
+ end
+ r.on_completed do |event|
+ puts event
+ EM::stop_event_loop
+ end
+
+ prompt_for_data do |data|
+ puts "SENDING: #{data}"
+ r.put :input => data
+ end
+ end
+ end
+end
+
+def prompt_for_data(&block)
+ op = proc { $stdin.gets }
+ cb = proc {|data|
+ if "LQUIT" === data.chomp.upcase
+ EM::stop_event_loop
+ else
+ block.call(data) unless block.nil?
+ EM::defer(op, cb)
+ end
+ }
+ EM::defer(op, cb)
+end
+
+if ENV["DEBUGGER"]
+ Pelvis.logger.level = Logger::DEBUG
+end
+
+EM.run do
+ connect_herault
+end
View
@@ -0,0 +1,72 @@
+#!/usr/bin/env ruby
+
+require 'rubygems'
+
+require File.dirname(__FILE__) + '/lib/pelvis'
+require File.dirname(__FILE__) + '/lib/pelvis/protocols/xmpp'
+
+require File.dirname(__FILE__) + '/examples/actors/herault'
+require File.dirname(__FILE__) + '/examples/actors/echo'
+
+def connect_herault
+ Pelvis.connect(:xmpp, :jid => 'herault@localhost/agent', :password => 'testing', :advertise => false) do |agent|
+ agent.add_actor Herault
+ agent.on_advertised do
+ puts "herault is ready"
+ connect_echo
+ end
+ end
+end
+
+def connect_echo
+ Pelvis.connect(:xmpp, :jid => 'dummy@localhost/agent', :password => 'testing') do |agent|
+ agent.add_actor Echo
+ agent.on_advertised do
+ puts "echo is ready"
+ connect_console
+ end
+ end
+end
+
+def connect_console
+ Pelvis.connect(:xmpp, :jid => 'admin@localhost/agent', :password => 'testing') do |agent|
+ agent.on_advertised do
+ puts "console is ready"
+
+ r = agent.request(:all, "/do/echo", {}, :identities => ["dummy@localhost/agent"])
+ r.on_received do |data|
+ puts data[:output]
+ end
+ r.on_completed do |event|
+ puts event
+ EM::stop_event_loop
+ end
+
+ prompt_for_data do |data|
+ puts "SENDING: #{data}"
+ r.put :input => data
+ end
+ end
+ end
+end
+
+def prompt_for_data(&block)
+ op = proc { $stdin.gets.chomp }
+ cb = proc {|data|
+ if "LQUIT" === data.chomp.upcase
+ EM::stop_event_loop
+ else
+ block.call(data) unless block.nil?
+ EM::defer(op, cb)
+ end
+ }
+ EM::defer(op, cb)
+end
+
+if ENV["DEBUGGER"]
+ Pelvis.logger.level = Logger::DEBUG
+end
+
+EM.run do
+ connect_herault
+end
View
@@ -106,4 +106,20 @@
end
end
+ describe "that sends data" do
+ it "should work" do
+ results = TestDelegate.new
+ start_agents do |agent|
+ r = agent.request(:direct, '/echo_data', {}, :identities => [identity_for(:foo)], :delegate => results)
+ # When this is all running locally, we need to do a little dance so we
+ # don't stall the reactor. The actor will send some data that we can
+ # then respond to.
+ r.on_received do |data|
+ r.put 'input' => 'foo' if {'data' => 'prompt'} === data
+ end
+ end
+ should_be_good(results, [{'data' => 'prompt'}, {'input' => 'foo'}])
+ end
+ end
+
end
View
@@ -82,6 +82,16 @@ def limited
send_data params
finish
end
+
+ operation "/echo_data"
+ def echo_data
+ send_data 'data' => 'prompt'
+ recv_data do |data|
+ logger.debug "echo_data recv_data: #{data.inspect}"
+ send_data data
+ finish
+ end
+ end
end
require File.dirname(__FILE__) + '/helpers'

0 comments on commit 44bb05f

Please sign in to comment.