Navigation Menu

Skip to content

Commit

Permalink
Make Stream usable in a supervision tree
Browse files Browse the repository at this point in the history
Supervisors don't tolerate a secondary startup step. As such, we won't have a handle on the stream before it's already receiving events, so we need to pass the stream to the event handler in order for it to respond.
  • Loading branch information
benlangfeld committed Jan 8, 2016
1 parent 3e62051 commit d9b806a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,5 +1,6 @@
# [develop](https://github.com/adhearsion/ruby_ami)
* Breaking change: Removed the deprecated `RubyAMI::Client` because it is no longer relevant.
* Breaking change: Start the connection when the `RubyAMI::Stream` starts, permitting supervised restart of the stream on connection failure. To support this, the event callback now passes a second parameter which is the stream itself.

# [2.4.0](https://github.com/adhearsion/ruby_ami/compare/v2.3.0...v2.4.0) - [2015-12-07](https://rubygems.org/gems/ruby_ami/versions/2.4.0)
# Feature: Reveal the AMI version for a `Stream` via `Stream#version`
Expand Down
32 changes: 20 additions & 12 deletions README.md
Expand Up @@ -21,10 +21,6 @@ In order to setup a connection to listen for AMI events, one can do:
```ruby
require 'ruby_ami'

stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password',
->(e) { handle_event e },
Logger.new(STDOUT), 10

def handle_event(event)
case event.name
when 'FullyBooted'
Expand All @@ -34,28 +30,40 @@ def handle_event(event)
end
end

stream.run # This will block until the actor is terminated elsewhere. stream.async.run is also available if you need to do other things in the main thread.
stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password',
->(e) { handle_event e },
Logger.new(STDOUT), 10

Celluloid.join(stream) # This will block until the actor is terminated elsewhere. Otherwise, the actor will run in its own thread allowing other work to be done here.
```

It is also possible to execute actions in response to events:
Note that using `Stream.new`, the actor will shut down when the connection is lost (and in this case the program will exit). If it is necessary to restart the actor on failure, you can start it in a Celluloid supervisor:

```ruby
require 'ruby_ami'

$stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password',
RubyAMI::Stream.supervise_as :ami_connection, '127.0.0.1', 5038, 'manager', 'password',
->(e) { handle_event e },
Logger.new(STDOUT), 10
```

def handle_event(event)
It is also possible to execute actions in response to events:

```ruby
require 'ruby_ami'

def handle_event(event, stream)
case event.name
when 'FullyBooted'
puts "The connection was successful. Originating a call."
response = $stream.send_action 'Originate', 'Channel' => 'SIP/foo'
response = stream.send_action 'Originate', 'Channel' => 'SIP/foo'
puts "The call origination resulted in #{response.inspect}"
end
end

$stream.run
stream = RubyAMI::Stream.new '127.0.0.1', 5038, 'manager', 'password',
->(e) { handle_event e },
Logger.new(STDOUT), 10

Celluloid.join(stream) # This will block until the actor is terminated elsewhere. Otherwise, the actor will run in its own thread allowing other work to be done here.
```

Executing actions does not strictly have to be done within the event handler, but it is not valid to send AMI events before receiving a `FullyBooted` event. If you attempt to execute an action prior to this, it may fail, and `RubyAMI::Stream` will not help you recover or queue the action until the connection is `FullyBooted`; you must manage this timing yourself. That said, assuming you take care of this, you may invoke `RubyAMI::Stream#send_action` from anywhere in your code and it will return the response of the action.
Expand Down
3 changes: 2 additions & 1 deletion lib/ruby_ami/stream.rb
Expand Up @@ -29,6 +29,7 @@ def initialize(host, port, username, password, event_callback, logger = Logger,
@lexer = Lexer.new self
@sent_actions = {}
@causal_actions = {}
async.run
end

[:started, :stopped, :ready].each do |state|
Expand Down Expand Up @@ -125,7 +126,7 @@ def dispatch_action(*args, &block)
end

def fire_event(event)
@event_callback.call event
@event_callback.call event, current_actor
end

def register_sent_action(action)
Expand Down
48 changes: 15 additions & 33 deletions spec/ruby_ami/stream_spec.rb
Expand Up @@ -10,9 +10,9 @@ def client
end

before do
def client.message_received(message)
def client.message_received(message, stream)
@messages ||= Queue.new
@messages << message
@messages << [message, stream]
end

def client.messages
Expand All @@ -33,35 +33,26 @@ def mocked_server(times = nil, fake_client = nil, &block)
mock_target = MockServer.new
mock_target.should_receive(:receive_data).send(*(times ? [:exactly, times] : [:at_least, 1])).with &block
s = ServerMock.new '127.0.0.1', server_port, mock_target
@stream = Stream.new '127.0.0.1', server_port, username, password, lambda { |m| client.message_received m }
@stream.async.run
@stream = Stream.new '127.0.0.1', server_port, username, password, lambda { |m, stream| client.message_received m, stream }
fake_client.call if fake_client.respond_to? :call
Celluloid::Actor.join s
Timeout.timeout 5 do
Celluloid::Actor.join @stream
end
end

def expect_connected_event
client.should_receive(:message_received).with Stream::Connected.new
end

def expect_disconnected_event
client.should_receive(:message_received).with Stream::Disconnected.new
end

before { @sequence = 1 }

describe "after connection" do
it "should be started" do
expect_connected_event
expect_disconnected_event
mocked_server 0, -> { @stream.started?.should be_true }
client_messages.should be == [
[Stream::Connected.new, @stream],
[Stream::Disconnected.new, @stream],
]
end

it "stores the reported AMI version" do
expect_connected_event
expect_disconnected_event
mocked_server(1, lambda {
@stream.send_action('Command') # Just to get the server kicked in to replying using the below block
expect(@stream.version).to eq('2.8.0')
Expand All @@ -79,8 +70,6 @@ def expect_disconnected_event
end

it "can send an action" do
expect_connected_event
expect_disconnected_event
mocked_server(1, lambda { @stream.send_action('Command') }) do |val, server|
val.should == <<-ACTION
Action: command\r
Expand All @@ -98,8 +87,6 @@ def expect_disconnected_event
end

it "can send an action with headers" do
expect_connected_event
expect_disconnected_event
mocked_server(1, lambda { @stream.send_action('Command', 'Command' => 'RECORD FILE evil') }) do |val, server|
val.should == <<-ACTION
Action: command\r
Expand Down Expand Up @@ -148,8 +135,6 @@ def expect_disconnected_event
let(:password) { 'jones' }

it "should log itself in" do
expect_connected_event
expect_disconnected_event
mocked_server(1, lambda { }) do |val, server|
val.should == <<-ACTION
Action: login\r
Expand Down Expand Up @@ -183,18 +168,13 @@ def expect_disconnected_event
end

client_messages.should be == [
Stream::Connected.new,
Event.new('Hangup', 'Channel' => 'SIP/101-3f3f', 'Uniqueid' => '1094154427.10', 'Cause' => '0'),
Stream::Disconnected.new
[Stream::Connected.new, @stream],
[Event.new('Hangup', 'Channel' => 'SIP/101-3f3f', 'Uniqueid' => '1094154427.10', 'Cause' => '0'), @stream],
[Stream::Disconnected.new, @stream],
]
end

describe 'when a response is received' do
before do
expect_connected_event
expect_disconnected_event
end

it 'should be returned from #send_action' do
response = nil
mocked_server(1, lambda { response = @stream.send_action 'Command', 'Command' => 'RECORD FILE evil' }) do |val, server|
Expand All @@ -219,7 +199,7 @@ def expect_disconnected_event
EVENT
end

response.should == Response.new('ActionID' => RubyAMI.new_uuid, 'Message' => 'Thanks for all the fish.')
end

Expand Down Expand Up @@ -304,12 +284,14 @@ def expect_disconnected_event
end

it 'puts itself in the stopped state and fires a disconnected event when unbound' do
expect_connected_event
expect_disconnected_event
mocked_server(1, lambda { @stream.send_data 'Foo' }) do |val, server|
@stream.stopped?.should be false
end
@stream.alive?.should be false
client_messages.should be == [
[Stream::Connected.new, @stream],
[Stream::Disconnected.new, @stream],
]
end
end

Expand Down

0 comments on commit d9b806a

Please sign in to comment.