public
Description: Open-source framework for writing voice-enabled applications using Ruby.
Homepage: http://adhearsion.com
Clone URL: git://github.com/jicksta/adhearsion.git
Click here to lend your support to: adhearsion and make a donation at www.pledgie.com !
Jay Phillips (author)
Thu Jun 11 10:55:40 -0700 2009
commit  43d2f2144a84db6b60fabe668b04dc868df5f405
tree    9229e33f0610f69d88cc746037ff964217c20e9b
parent  f70167313a63f87fd97a440153ffb36e029f7b0a
adhearsion / lib / adhearsion / voip / asterisk / manager_interface.rb
100644 598 lines (519 sloc) 28.044 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
require 'adhearsion/voip/asterisk/manager_interface/ami_lexer'
 
module Adhearsion
  module VoIP
    module Asterisk
      
      ##
      # Sorry, this AMI class has been deprecated. Please see http://docs.adhearsion.com/Asterisk_Manager_Interface for
      # documentation on the new way of handling AMI. This new version is much better and should not require an enormous
      # migration on your part.
      #
      class AMI
        def initialize
          raise "Sorry, this AMI class has been deprecated. Please see http://docs.adhearsion.com/Asterisk_Manager_Interface for documentation on the new way of handling AMI. This new version is much better and should not require an enormous migration on your part."
        end
      end
      
      mattr_accessor :manager_interface
      
      module Manager
        
        ##
        # This class abstracts a connection to the Asterisk Manager Interface. Its purpose is, first and foremost, to make
        # the protocol consistent. Though the classes employed to assist this class (ManagerInterfaceAction,
        # ManagerInterfaceResponse, ManagerInterfaceError, etc.) are relatively user-friendly, they're designed to be a
        # building block on which to build higher-level abstractions of the Asterisk Manager Interface.
        #
        # For a higher-level abstraction of the Asterisk Manager Interface, see the SuperManager class.
        #
        class ManagerInterface
          
          class << self
            
            def connect(*args)
              returning new(*args) do |connection|
                connection.connect!
              end
            end
            
            def replies_with_action_id?(name, headers={})
              name = name.to_s.downcase
              # TODO: Expand this case statement
              case name
                when "queues", "iaxpeers"
                  false
                else
                  true
              end
            end
            
            ##
            # When sending an action with "causal events" (i.e. events which must be collected to form a proper
            # response), AMI should send a particular event which instructs us that no more events will be sent.
            # This event is called the "causal event terminator".
            #
            # Note: you must supply both the name of the event and any headers because it's possible that some uses of an
            # action (i.e. same name, different headers) have causal events while other uses don't.
            #
            # @param [String] name the name of the event
            # @param [Hash] the headers associated with this event
            # @return [String] the downcase()'d name of the event name for which to wait
            #
            def has_causal_events?(name, headers={})
              name = name.to_s.downcase
              case name
                when "queuestatus", "sippeers", "parkedcalls", "status"
                  true
                else
                  false
              end
            end
            
            ##
            # Used to determine the event name for an action which has causal events.
            #
            # @param [String] action_name
            # @return [String] The corresponding event name which signals the completion of the causal event sequence.
            #
            def causal_event_terminator_name_for(action_name)
              return nil unless has_causal_events?(action_name)
              action_name = action_name.to_s.downcase
               case action_name
                 when "queuestatus", 'parkedcalls', "status"
                   action_name + "complete"
                 when "sippeers"
                   "peerlistcomplete"
               end
            end
            
          end
        
          DEFAULT_SETTINGS = {
            :host => "localhost",
            :port => 5038,
            :username => "admin",
            :password => "secret",
            :events => true
          }.freeze unless defined? DEFAULT_SETTINGS
          
          attr_reader *DEFAULT_SETTINGS.keys
          
          ##
          # Creates a new Asterisk Manager Interface connection and exposes certain methods to control it. The constructor
          # takes named parameters as Symbols. Note: if the :events option is given, this library will establish a separate
          # socket for just events. Two sockets are used because some actions actually respond with events, making it very
          # complicated to differentiate between response-type events and normal events.
          #
          # @param [Hash] options Available options are :host, :port, :username, :password, and :events
          #
          def initialize(options={})
            options = parse_options options
            
            @host = options[:host]
            @username = options[:username]
            @password = options[:password]
            @port = options[:port]
            @events = options[:events]
            
            @sent_messages = {}
            @sent_messages_lock = Mutex.new
            
            @actions_lexer = DelegatingAsteriskManagerInterfaceLexer.new self, \
                :message_received => :action_message_received,
                :error_received => :action_error_received
            
            @write_queue = Queue.new
            
            if @events
              @events_lexer = DelegatingAsteriskManagerInterfaceLexer.new self, \
                  :message_received => :event_message_received,
                  :error_received => :event_error_received
            end
          end
          
          def action_message_received(message)
            if message.kind_of? Manager::ManagerInterfaceEvent
              # Trigger the return value of the waiting action id...
              corresponding_action = @current_action_with_causal_events
              event_collection = @event_collection_for_current_action
              
              if corresponding_action
                
                # If this is the meta-event which signals no more events will follow and the response is complete.
                if message.name.downcase == corresponding_action.causal_event_terminator_name
                  
                  # Result found! Wake up any Threads waiting
                  corresponding_action.future_resource.resource = event_collection.freeze
                  
                  @current_action_with_causal_events = nil
                  @event_collection_for_current_action = nil
                  
                else
                  event_collection << message
                  # We have more causal events coming.
                end
              else
                ahn_log.ami.error "Got an unexpected event on actions socket! This may be a bug! #{message.inspect}"
              end
              
            elsif message["ActionID"].nil?
              # No ActionID! Release the write lock and wake up the waiter
            else
              action_id = message["ActionID"]
              corresponding_action = data_for_message_received_with_action_id action_id
              if corresponding_action
                message.action = corresponding_action
                
                if corresponding_action.has_causal_events?
                  # By this point the write loop will already have started blocking by calling the response() method on the
                  # action. Because we must collect more events before we wake the write loop up again, let's create these
                  # instance variable which will needed when the subsequent causal events come in.
                  @current_action_with_causal_events = corresponding_action
                  @event_collection_for_current_action = []
                else
                  # Wake any Threads waiting on the response.
                  corresponding_action.future_resource.resource = message
                end
              else
                ahn_log.ami.error "Received an AMI message with an unrecognized ActionID!! This may be an bug! #{message.inspect}"
              end
            end
          end
          
          def action_error_received(ami_error)
            action_id = ami_error["ActionID"]
            
            corresponding_action = data_for_message_received_with_action_id action_id
 
            if corresponding_action
              corresponding_action.future_resource.resource = ami_error
            else
              ahn_log.ami.error "Received an AMI error with an unrecognized ActionID!! This may be an bug! #{ami_error.inspect}"
            end
          end
          
          ##
          # Called only when this ManagerInterface is instantiated with events enabled.
          #
          def event_message_received(event)
            return if event.kind_of?(ManagerInterfaceResponse) && event["Message"] == "Authentication accepted"
            # TODO: convert the event name to a certain namespace.
            Events.trigger %w[asterisk manager_interface], event
          end
          
          def event_error_received(message)
            # Does this ever even occur?
            ahn_log.ami.error "Hmmm, got an error on the AMI events-only socket! This must be a bug! #{message.inspect}"
          end
          
          ##
          # Called when our Ragel parser encounters some unexpected syntax from Asterisk. Anytime this is called, it should
          # be considered a bug in Adhearsion. Note: this same method is called regardless of whether the syntax error
          # happened on the actions socket or on the events socket.
          #
          def syntax_error_encountered(ignored_chunk)
            ahn_log.ami.error "ADHEARSION'S AMI PARSER ENCOUNTERED A SYNTAX ERROR! " +
                "PLEASE REPORT THIS ON http://bugs.adhearsion.com! OFFENDING TEXT:\n#{ignored_chunk.inspect}"
          end
          
          ##
          # Must be called after instantiation. Also see ManagerInterface::connect().
          #
          # @raise [AuthenticationFailedException] if username or password are rejected
          #
          def connect!
            establish_actions_connection
            establish_events_connection if @events
            self
          end
          
          def actions_connection_established
            @actions_state = :connected
            @actions_writer_thread = Thread.new(&method(:write_loop))
          end
          
          def actions_connection_disconnected
            @actions_state = :disconnected
          end
          
          def events_connection_established
            @events_state = :connected
          end
          
          def actions_connection_disconnected
            @events_state = :disconnected
          end
          
          def disconnect!
            # PSEUDO CODE
            # TODO: Go through all the waiting condition variables and raise an exception
            #@write_queue << :STOP!
            raise NotImplementedError
          end
        
          def dynamic
            # TODO: Return an object which responds to method_missing
          end
                    
          ##
          # Used to directly send a new action to Asterisk. Note: NEVER supply an ActionID; these are handled internally.
          #
          # @param [String, Symbol] action_name The name of the action (e.g. Originate)
          # @param [Hash] headers Other key/value pairs to send in this action. Note: don't provide an ActionID
          # @return [FutureResource] Call resource() on this object if you wish to access the response (optional). Note: if the response has not come in yet, your Thread will wait until it does.
          #
          def send_action_asynchronously(action_name, headers={})
            check_action_name action_name
            action = ManagerInterfaceAction.new(action_name, headers)
            if action.replies_with_action_id?
              @write_queue << action
              action
            else
              raise NotImplementedError
            end
          end
          
          ##
          # Sends an action over the AMI connection and blocks your Thread until the response comes in. If there was an error
          # for some reason, the error will be raised as an ManagerInterfaceError.
          #
          # @param [String, Symbol] action_name The name of the action (e.g. Originate)
          # @param [Hash] headers Other key/value pairs to send in this action. Note: don't provide an ActionID
          # @raise [ManagerInterfaceError] When Asterisk can't execute this action, it sends back an Error which is converted into an ManagerInterfaceError object and raised. Access ManagerInterfaceError#message for the reported message from Asterisk.
          # @return [ManagerInterfaceResponse, ImmediateResponse] Contains the response from Asterisk and all headers
          #
          def send_action_synchronously(*args)
            returning send_action_asynchronously(*args).response do |response|
              raise response if response.kind_of?(ManagerInterfaceError)
            end
          end
          
          alias send_action send_action_synchronously
        
        
          ####### #######
          ########### ###########
          ################# SOON-DEPRECATED COMMANDS #################
          ########### ###########
          ####### #######
          
          # ping sends an action to the Asterisk Manager Interface that returns a pong
          # more details here: http://www.voip-info.org/wiki/index.php?page=Asterisk+Manager+API+Action+Ping
          def ping
            deprecation_warning
            send_action "Ping"
            true
          end
          
          def deprecation_warning
            ahn_log.ami.deprecation.warn "The implementation of the ping, originate, introduce, hangup, call_into_context " +
                "and call_and_exec methods will soon be moved from this class to SuperManager. At the moment, the " +
                "SuperManager abstractions are not completed. Don't worry. The migration to SuperManager will be very easy."+
                " See http://docs.adhearsion.com/AMI for more information."
          end
          
          # The originate method launches a call to Asterisk, full details here:
          # http://www.voip-info.org/tiki-index.php?page=Asterisk+Manager+API+Action+Originate
          # Takes these arguments as a hash:
          #
          # Channel: Channel on which to originate the call (The same as you specify in the Dial application command)
          # Context: Context to use on connect (must use Exten & Priority with it)
          # Exten: Extension to use on connect (must use Context & Priority with it)
          # Priority: Priority to use on connect (must use Context & Exten with it)
          # Timeout: Timeout (in milliseconds) for the originating connection to happen(defaults to 30000 milliseconds)
          # CallerID: CallerID to use for the call
          # Variable: Channels variables to set (max 32). Variables will be set for both channels (local and connected).
          # Account: Account code for the call
          # Application: Application to use on connect (use Data for parameters)
          # Data : Data if Application parameter is used
          # Async: For the origination to be asynchronous (allows multiple calls to be generated without waiting for a response)
          # ActionID: The request identifier. It allows you to identify the response to this request.
          # You may use a number or a string. Useful when you make several simultaneous requests.
          #
          # For example:
          # originate { :channel => 'SIP/1000@sipnetworks.com',
          # :context => 'my_context',
          # :exten => 's',
          # :priority => '1' }
          def originate(options={})
            deprecation_warning
            options = options.clone
            options[:callerid] = options.delete :caller_id if options.has_key? :caller_id
            options[:exten] = options.delete :extension if options.has_key? :extension
            send_action "Originate", options
          end
 
          # An introduction connects two endpoints together. The first argument is
          # the first person the PBX will call. When she's picked up, Asterisk will
          # play ringing while the second person is being dialed.
          #
          # The first argument is the person called first. Pass this as a canonical
          # IAX2/server/user type argument. Destination takes the same format, but
          # comma-separated Dial() arguments can be optionally passed after the
          # technology.
          #
          # TODO: Provide an example when this works.
          #
          def introduce(caller, callee, opts={})
            deprecation_warning
            dial_args = callee
            dial_args += "|#{opts[:options]}" if opts[:options]
            call_and_exec caller, "Dial", :args => dial_args, :caller_id => opts[:caller_id]
          end
 
          # hangup terminates a call accepts a channel as the argument
          # full details here: http://www.voip-info.org/wiki/index.php?page=Asterisk+Manager+API+Action+Hangup
          def hangup(channel)
            deprecation_warning
            send_action "Hangup", :channel => channel
          end
 
          # call_and_exec allows you to make a call to a channel and then execute an Astersik application
          # on that call
          def call_and_exec(channel, app, opts={})
            deprecation_warning
            args = { :channel => channel, :application => app }
            args[:caller_id] = opts[:caller_id] if opts[:caller_id]
            args[:data] = opts[:args] if opts[:args]
            originate args
          end
 
          # call_into_context is syntactic sugar for the Asterisk originate command that allows you to
          # lanuch a call into a particular context. For example:
          #
          # call_into_context('SIP/1000@sipnetworks.com', 'my_context', { :variables => { :session_guid => new_guid }})
          def call_into_context(channel, context, options={})
            deprecation_warning
            args = {:channel => channel, :context => context}
            args[:priority] = options[:priority] || 1
            args[:exten] = options[:extension] if options[:extension]
            args[:caller_id] = options[:caller_id] if options[:caller_id]
            if options[:variables] && options[:variables].kind_of?(Hash)
              args[:variable] = options[:variables].map {|pair| pair.join('=')}.join('|')
            end
            originate args
          end
          
          ####### #######
          ########### ###########
          ################# END SOON-DEPRECATED COMMANDS #################
          ########### ###########
          ####### #######
 
        
          protected
          
          ##
          # This class will be removed once this AMI library fully supports all known protocol anomalies.
          #
          class UnsupportedActionName < ArgumentError
            UNSUPPORTED_ACTION_NAMES = %w[
queues
iaxpeers
] unless defined? UNSUPPORTED_ACTION_NAMES
            def initialize(name)
              super "At the moment this AMI library doesn't support the #{name.inspect} action because it causes a protocol anomaly. Support for it will be coming shortly."
            end
            
          end
          
          def check_action_name(name)
            name = name.to_s.downcase
            raise UnsupportedActionName.new(name) if UnsupportedActionName::UNSUPPORTED_ACTION_NAMES.include? name
            true
          end
          
          def write_loop
            loop do
              next_action = @write_queue.shift
              return :stopped if next_action.equal? :STOP!
              register_action_with_metadata next_action
              
              ahn_log.ami.debug "Sending AMI action: #{"\n>>> " + next_action.to_s.gsub(/(\r\n)+/, "\n>>> ")}"
              @actions_connection.send_data next_action.to_s
              # If it's "causal event" action, we must wait here until it's fully responded
              next_action.response if next_action.has_causal_events?
            end
          rescue => e
            p e
          end
          
          ##
          # When we send out an AMI action, we need to track the ActionID and have the other Thread handling the socket IO
          # notify the sending Thread that a response has been received. This method instantiates a new FutureResource and
          # keeps it around in a synchronized Hash for the IO-handling Thread to notify when a response with a matching
          # ActionID is seen again. See also data_for_message_received_with_action_id() which is how the IO-handling Thread
          # gets the metadata registered in the method back later.
          #
          # @param [ManagerInterfaceAction] action The ManagerInterfaceAction to send
          # @param [Hash] headers The other key/value pairs being sent with this message
          #
          def register_action_with_metadata(action)
            raise ArgumentError, "Must supply an action!" if action.nil?
            @sent_messages_lock.synchronize do
              @sent_messages[action.action_id] = action
            end
          end
          
          def data_for_message_received_with_action_id(action_id)
            @sent_messages_lock.synchronize do
              @sent_messages.delete action_id
            end
          end
          
          ##
          # Instantiates a new ManagerInterfaceActionsConnection and assigns it to @actions_connection.
          #
          # @return [EventSocket]
          #
          def establish_actions_connection
            @actions_connection = EventSocket.connect(@host, @port) do |handler|
              handler.receive_data { |data| @actions_lexer << data }
              handler.connected { actions_connection_established }
              handler.disconnected { actions_connection_disconnected }
            end
            login_actions
          end
          
          ##
          # Instantiates a new ManagerInterfaceEventsConnection and assigns it to @events_connection.
          #
          # @return [EventSocket]
          #
          def establish_events_connection
            
            # Note: the @events_connection instance variable is set in login()
            @events_connection = EventSocket.connect(@host, @port) do |handler|
              handler.receive_data { |data| @events_lexer << data }
              handler.connected { events_connection_established }
              handler.disconnected { events_connection_disconnected }
            end
            login_events
            ahn_log.ami "Successful AMI events-only connection into #{@username}@#{@host}"
          end
 
          def login_actions
            action = send_action_asynchronously "Login", "Username" => @username, "Secret" => @password, "Events" => "Off"
            response = action.response
            if response.kind_of? ManagerInterfaceError
              raise AuthenticationFailedException, "Incorrect username and password! #{response.message}"
            else
              ahn_log.ami "Successful AMI actions-only connection into #{@username}@#{@host}"
              response
            end
          end
          
          ##
          # Since this method is always called after the login_actions method, an AuthenticationFailedException would have already
          # been raised if the username/password were off. Because this is the only action we ever need to send on this socket,
          # it goes straight to the EventSocket connection (bypassing the @write_queue).
          #
          def login_events
            login_action = ManagerInterfaceAction.new "Login", "Username" => @username, "Secret" => @password, "Events" => "On"
            @events_connection.send_data login_action.to_s
          end
          
          def parse_options(options)
            unrecognized_keys = options.keys.map { |key| key.to_sym } - DEFAULT_SETTINGS.keys
            if unrecognized_keys.any?
              raise ArgumentError, "Unrecognized named argument(s): #{unrecognized_keys.to_sentence}"
            end
            DEFAULT_SETTINGS.merge options
          end
          
          ##
          # Raised when calling ManagerInterface#connect!() and the server responds with an error after logging in.
          #
          class AuthenticationFailedException < Exception; end
          
          class NotConnectedError < Exception; end
          
          ##
          # Each time ManagerInterface#send_action is invoked, a new ManagerInterfaceAction is instantiated.
          #
          class ManagerInterfaceAction
            
            attr_reader :name, :headers, :future_resource, :action_id, :causal_event_terminator_name
            def initialize(name, headers={})
              @name = name.to_s.downcase.freeze
              @headers = headers.stringify_keys.freeze
              @action_id = new_action_id.freeze
              @future_resource = FutureResource.new
              @causal_event_terminator_name = ManagerInterface.causal_event_terminator_name_for name
            end
            
            ##
            # Used internally by ManagerInterface for the actions in AMI which break the protocol's definition and do not
            # reply with an ActionID.
            #
            def replies_with_action_id?
              ManagerInterface.replies_with_action_id?(@name, @headers)
            end
            
            ##
            # Some AMI actions effectively respond with many events which collectively constitute the actual response. These
            # Must be handled specially by the protocol parser, so this method helps inform the parser.
            #
            def has_causal_events?
              ManagerInterface.has_causal_events?(@name, @headers)
            end
            
            ##
            # Abstracts the generation of new ActionIDs. This could be implemented virutally any way, provided each
            # invocation returns something unique, so this will generate a GUID and return it.
            #
            # @return [String] characters in GUID format (e.g. "4C5F4E1C-A0F1-4D13-8751-C62F2F783062")
            #
            def new_action_id
              new_guid # Implemented in lib/adhearsion/foundation/pseudo_guid.rb
            end
            
            ##
            # Converts this action into a protocol-valid String, ready to be sent over a socket.
            #
            def to_s
              @textual_representation ||= (
                  "Action: #{@name}\r\nActionID: #{@action_id}\r\n" +
                  @headers.map { |(key,value)| "#{key}: #{value}" }.join("\r\n") +
                  (@headers.any? ? "\r\n\r\n" : "\r\n")
              )
            end
            
            ##
            # If the response has simply not been received yet from Asterisk, the calling Thread will block until it comes
            # in. Once the response comes in, subsequent calls immediately return a reference to the ManagerInterfaceResponse
            # object.
            #
            def response
              future_resource.resource
            end
                        
          end
        end
      end
    end
  end
end