From d9b806a74af498683a992f009b3882c1563cd806 Mon Sep 17 00:00:00 2001 From: Ben Langfeld Date: Fri, 8 Jan 2016 09:52:11 -0200 Subject: [PATCH] Make Stream usable in a supervision tree Supervisors don't tolerate a secondary startup step. As such, we won't have a handle on the stream before it's already receiving events, so we need to pass the stream to the event handler in order for it to respond. --- CHANGELOG.md | 1 + README.md | 32 +++++++++++++++--------- lib/ruby_ami/stream.rb | 3 ++- spec/ruby_ami/stream_spec.rb | 48 +++++++++++------------------------- 4 files changed, 38 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 793d674..9682930 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # [develop](https://github.com/adhearsion/ruby_ami) * Breaking change: Removed the deprecated `RubyAMI::Client` because it is no longer relevant. + * Breaking change: Start the connection when the `RubyAMI::Stream` starts, permitting supervised restart of the stream on connection failure. To support this, the event callback now passes a second parameter which is the stream itself. # [2.4.0](https://github.com/adhearsion/ruby_ami/compare/v2.3.0...v2.4.0) - [2015-12-07](https://rubygems.org/gems/ruby_ami/versions/2.4.0) # Feature: Reveal the AMI version for a `Stream` via `Stream#version` diff --git a/README.md b/README.md index 4d19bc1..fc3d167 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,6 @@ In order to setup a connection to listen for AMI events, one can do: ```ruby require 'ruby_ami' -stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password', - ->(e) { handle_event e }, - Logger.new(STDOUT), 10 - def handle_event(event) case event.name when 'FullyBooted' @@ -34,28 +30,40 @@ def handle_event(event) end end -stream.run # This will block until the actor is terminated elsewhere. stream.async.run is also available if you need to do other things in the main thread. +stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password', + ->(e) { handle_event e }, + Logger.new(STDOUT), 10 + +Celluloid.join(stream) # This will block until the actor is terminated elsewhere. Otherwise, the actor will run in its own thread allowing other work to be done here. ``` -It is also possible to execute actions in response to events: +Note that using `Stream.new`, the actor will shut down when the connection is lost (and in this case the program will exit). If it is necessary to restart the actor on failure, you can start it in a Celluloid supervisor: ```ruby -require 'ruby_ami' - -$stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password', +RubyAMI::Stream.supervise_as :ami_connection, '127.0.0.1', 5038, 'manager', 'password', ->(e) { handle_event e }, Logger.new(STDOUT), 10 +``` -def handle_event(event) +It is also possible to execute actions in response to events: + +```ruby +require 'ruby_ami' + +def handle_event(event, stream) case event.name when 'FullyBooted' puts "The connection was successful. Originating a call." - response = $stream.send_action 'Originate', 'Channel' => 'SIP/foo' + response = stream.send_action 'Originate', 'Channel' => 'SIP/foo' puts "The call origination resulted in #{response.inspect}" end end -$stream.run +stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password', + ->(e) { handle_event e }, + Logger.new(STDOUT), 10 + +Celluloid.join(stream) # This will block until the actor is terminated elsewhere. Otherwise, the actor will run in its own thread allowing other work to be done here. ``` Executing actions does not strictly have to be done within the event handler, but it is not valid to send AMI events before receiving a `FullyBooted` event. If you attempt to execute an action prior to this, it may fail, and `RubyAMI::Stream` will not help you recover or queue the action until the connection is `FullyBooted`; you must manage this timing yourself. That said, assuming you take care of this, you may invoke `RubyAMI::Stream#send_action` from anywhere in your code and it will return the response of the action. diff --git a/lib/ruby_ami/stream.rb b/lib/ruby_ami/stream.rb index 6a2ff56..44635d5 100644 --- a/lib/ruby_ami/stream.rb +++ b/lib/ruby_ami/stream.rb @@ -29,6 +29,7 @@ def initialize(host, port, username, password, event_callback, logger = Logger, @lexer = Lexer.new self @sent_actions = {} @causal_actions = {} + async.run end [:started, :stopped, :ready].each do |state| @@ -125,7 +126,7 @@ def dispatch_action(*args, &block) end def fire_event(event) - @event_callback.call event + @event_callback.call event, current_actor end def register_sent_action(action) diff --git a/spec/ruby_ami/stream_spec.rb b/spec/ruby_ami/stream_spec.rb index e687bb8..f8df92f 100644 --- a/spec/ruby_ami/stream_spec.rb +++ b/spec/ruby_ami/stream_spec.rb @@ -10,9 +10,9 @@ def client end before do - def client.message_received(message) + def client.message_received(message, stream) @messages ||= Queue.new - @messages << message + @messages << [message, stream] end def client.messages @@ -33,8 +33,7 @@ def mocked_server(times = nil, fake_client = nil, &block) mock_target = MockServer.new mock_target.should_receive(:receive_data).send(*(times ? [:exactly, times] : [:at_least, 1])).with &block s = ServerMock.new '127.0.0.1', server_port, mock_target - @stream = Stream.new '127.0.0.1', server_port, username, password, lambda { |m| client.message_received m } - @stream.async.run + @stream = Stream.new '127.0.0.1', server_port, username, password, lambda { |m, stream| client.message_received m, stream } fake_client.call if fake_client.respond_to? :call Celluloid::Actor.join s Timeout.timeout 5 do @@ -42,26 +41,18 @@ def mocked_server(times = nil, fake_client = nil, &block) end end - def expect_connected_event - client.should_receive(:message_received).with Stream::Connected.new - end - - def expect_disconnected_event - client.should_receive(:message_received).with Stream::Disconnected.new - end - before { @sequence = 1 } describe "after connection" do it "should be started" do - expect_connected_event - expect_disconnected_event mocked_server 0, -> { @stream.started?.should be_true } + client_messages.should be == [ + [Stream::Connected.new, @stream], + [Stream::Disconnected.new, @stream], + ] end it "stores the reported AMI version" do - expect_connected_event - expect_disconnected_event mocked_server(1, lambda { @stream.send_action('Command') # Just to get the server kicked in to replying using the below block expect(@stream.version).to eq('2.8.0') @@ -79,8 +70,6 @@ def expect_disconnected_event end it "can send an action" do - expect_connected_event - expect_disconnected_event mocked_server(1, lambda { @stream.send_action('Command') }) do |val, server| val.should == <<-ACTION Action: command\r @@ -98,8 +87,6 @@ def expect_disconnected_event end it "can send an action with headers" do - expect_connected_event - expect_disconnected_event mocked_server(1, lambda { @stream.send_action('Command', 'Command' => 'RECORD FILE evil') }) do |val, server| val.should == <<-ACTION Action: command\r @@ -148,8 +135,6 @@ def expect_disconnected_event let(:password) { 'jones' } it "should log itself in" do - expect_connected_event - expect_disconnected_event mocked_server(1, lambda { }) do |val, server| val.should == <<-ACTION Action: login\r @@ -183,18 +168,13 @@ def expect_disconnected_event end client_messages.should be == [ - Stream::Connected.new, - Event.new('Hangup', 'Channel' => 'SIP/101-3f3f', 'Uniqueid' => '1094154427.10', 'Cause' => '0'), - Stream::Disconnected.new + [Stream::Connected.new, @stream], + [Event.new('Hangup', 'Channel' => 'SIP/101-3f3f', 'Uniqueid' => '1094154427.10', 'Cause' => '0'), @stream], + [Stream::Disconnected.new, @stream], ] end describe 'when a response is received' do - before do - expect_connected_event - expect_disconnected_event - end - it 'should be returned from #send_action' do response = nil mocked_server(1, lambda { response = @stream.send_action 'Command', 'Command' => 'RECORD FILE evil' }) do |val, server| @@ -219,7 +199,7 @@ def expect_disconnected_event EVENT end - + response.should == Response.new('ActionID' => RubyAMI.new_uuid, 'Message' => 'Thanks for all the fish.') end @@ -304,12 +284,14 @@ def expect_disconnected_event end it 'puts itself in the stopped state and fires a disconnected event when unbound' do - expect_connected_event - expect_disconnected_event mocked_server(1, lambda { @stream.send_data 'Foo' }) do |val, server| @stream.stopped?.should be false end @stream.alive?.should be false + client_messages.should be == [ + [Stream::Connected.new, @stream], + [Stream::Disconnected.new, @stream], + ] end end