/
ami.cr
356 lines (305 loc) · 10.5 KB
/
ami.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
require "socket"
require "socket/tcp_socket"
require "mutex"
require "uuid"
require "./logger.cr"
require "./ami/*"
module Asterisk
class AMI
WAIT_FOR_ANSWER = 0.0005
alias EventName = String
alias ActionID = String
alias AMIData = Hash(String, String | Array(String))
getter logger : Logger = Asterisk.logger
getter ami_version : String?
getter asterisk_version : String?
getter asterisk_platform : String?
@conn = TCPSocket.new
@conn_lock = Mutex.new
@connected = false
@running = false
@fully_booted = false
@event_callbacks = Hash(EventName, Proc(AMI, Event, Nil)).new
@responses = {} of ActionID => Array(Response)
class LoginError < Exception
end
class NotBootedError < Exception
end
class ConnectionError < Exception
end
# close client and raise error
private def raise(ex : Exception)
close
::raise ex
end
# on_close callback
def on_close(&@on_close : AMI ->)
end
# on_event callback (event name, AMI instance, event body)
def on_event(event : EventName, &block : AMI, Event ->)
@event_callbacks[event.to_s.downcase] = block
end
def initialize(@host = "127.0.0.1", @port : String | Int32 = 5038, @username = "", @secret = "", @logger : Logger = Asterisk.logger)
end
def connected?
@connected && running? && fully_booted?
end
def login
raise LoginError.new("Already connected, logoff first") if @connected
@conn = TCPSocket.new(@host, @port)
@conn.sync = true
@conn.keepalive = false
run
response = send_action({"action" => "Login",
"username" => @username,
"secret" => @secret})
if response.success?
# {"unknown" => "Asterisk Call Manager/2.10.5",
# "response" => "Success",
# "message" => "Authentication accepted"}
@ami_version = response["unknown"].as(String).split("/").last
@connected = true
# AMI should enqueue FullyBooted event that will be processed by runner
sleep 0.03
unless fully_booted?
raise NotBootedError.new("AMI shoud respond with FullyBooted event after successful login")
end
# last thing, get asterisk version
version_information = command("core show version")
version_information =~ /Asterisk(?:\s+certified\/|\s+)(\S+)\s+.+on a\s+(\S+)/
@asterisk_version = $1 rescue nil
@asterisk_platform = $2 rescue nil
logger.debug "#{self.class}.login: Logged in"
else
raise LoginError.new(response.message)
end
end
def connect; login; end
def start; login; end
def logoff
if running?
response = send_action({"action" => "Logoff"})
# {"response" => "Goodbye", "message" => "Thanks for all the fish."}
if response.response == "Goodbye"
logger.debug "#{self.class}.logoff: Logged off"
else
logger.error "#{self.class}.logoff: Logged off with incorrect response: #{response}"
end
end
close
end
def command(command : String) : String | Array(String)
result = send_action({"action" => "Command", "command" => command}).output
if result.size == 1
result.first
else
result
end
end
# send action asynchronously
def send_action!(action : AMIData)
conn_send(action)
end
def send_action(action : AMIData, expects_answer_before : Float64 = 0.3)
actionid = action["actionid"] ||= %(#{action["action"]}-#{UUID.random.to_s})
# conneciton listener will push responses to this container
responses = [] of Response
@responses[actionid] = responses
conn_send(action)
waited = 0
multiple_responses = false
loop do
sleep WAIT_FOR_ANSWER
waited += WAIT_FOR_ANSWER
# got response(s)?
first = responses.first?
last = responses.last?
# expecting to receive more data
if first && first["eventlist"]?.to_s =~ /start/i
multiple_responses = true
end
# that were last record
if multiple_responses && last && last["eventlist"]?.to_s =~ /Complete/i
break
end
# got one record and it is not eventlist-like response
# in this case result is a single response
break if first && ! multiple_responses
# timeout
if waited >= expects_answer_before
responses.push Response.new({"response" => "Timeout", "actionid" => actionid})
break
end
end
# remove mapping actionid => responses
@responses.delete(actionid)
if multiple_responses
response = responses.shift
response.events = [] of AMIData
responses.each {|r| response.events.as(Array(AMIData)).push r.data }
response
else
responses.first
end
end
# Format action as a multiline string delimited by "\r\n" and send it
# through AMI TCPSocket connection
private def conn_send(action : AMIData)
synchronize do
# Asterisk AMI action is a multi-line string delimited by "\r\n",
# following by empty string ("\r\n\r\n" is a termination characters)
action_s = ""
action.each do |k, v|
action_s += "#{k}: #{v}\r\n"
end
action_s += "\r\n"
@conn << action_s
rescue ex
raise ex
end
end
private def synchronize
@conn_lock.synchronize do
yield
end
end
private def run
@running = true
spawn do
logger.debug "#{self.class}.run: Starting"
while running?
io_data = conn_read
data = format(io_data)
logger.debug "#{self.class}.run: Formatted data: #{data.inspect}"
# Are asterisk get terminated elsewhere?
# (empty strings are coming to the AMI interface in some cases of
# forced process termination; in such case Asterisk does not send
# "Shutdown" event
if data.to_h == {"unknown" => ""}
close
next
end
if data.actionid?
container = @responses[data["actionid"]]?
container.push data unless container.nil?
end
# Rare case: data is not response nor event
if data["response"]?.nil? && data["event"]?.nil?
logger.error "#{self.class}.run: Don't know who should receive this: #{data.inspect}"
end
if data.is_a?(Event)
# FullyBooted event raised by AMI when all Asterisk initialization
# procedures have finished.
@fully_booted = true if data.event == "FullyBooted"
# Does asterisk get terminated elsewhere?
close if data.event == "Shutdown"
# do something with data, process hooks etc!
trigger_callback data
end
end
end
end
# Read AMI data, which is event, or response/confirmation to the enqueued
# action. AMI always return data as a set of multiple strings
# delimitered by "\r\n" with one empty string at the end ("\r\n\r\n")
private def conn_read : String
data = @conn.gets("\r\n\r\n").to_s
# logger.debug "#{self.class}.conn_read: <<< AMI data received: #{data}\n---i"
data
rescue IO::Timeout
raise ConnectionError.new("TCPSocket timeout error")
rescue ex
# Connection error triggered after @conn.close could be ignored
if running?
raise ex
else
""
end
end
# `format` process each line of given multi-line string (array), splitting
# it into key => value pair
private def format(data : String) : Response
# convert input data (multi-line string to the array of strings)
data = data.gsub(/\r\n\r\n$/, "").split("\r\n")
result = AMIData.new
cli_command = false
previous_key = ""
data.each do |line|
# Normally Asterisk manager event or confirmation for action containing
# key-value pair delimited by ": ", except string confirmations or data
# for user-event without delimiter, these will be assigned as unknown key
# Examples:
#
# "event: SuccessfulAuth" => ["event", "SuccessfulAuth"]
#
# "CoreShowChannels: List currently active channels. (Priv: system,reporting,all)" =>
# ["CoreShowChannels", "List currently active channels. (Priv: system,reporting,all)"]
# Logic for, after the actionid it follow up with resulting data
# ```
# {"action" => "Command", "command" => "..."}
# ```
if cli_command && previous_key == "actionid"
result["output"] = line.gsub(/--END COMMAND--$/, "").chomp.split("\n")
break
end
if line =~ /(^[\w\s\/-]*):[\s]*(.*)$/m
previous_key = key = $1.to_s.downcase
value = $2.to_s
# if key already present, then value is an array
if result.has_key?(key)
if result[key].is_a?(Array)
result[key].as(Array).push value
else
result[key] = [result[key].as(String)].push value
end
else
result[key] = value
end
# Asterisk 13, with multi-line output
if key == "response" && value == "Follows"
cli_command = true
end
# there were no delimiter, will assign data to the "unknown" key
else
key = "unknown"
if result.has_key?(key)
if result[key].is_a?(Array)
result[key].as(Array).push line
else
result[key] = [result[key].as(String)].push line
end
else
result[key] = line
end
end
end
# Make "output" equal for asterisk 13 and 16
if result.has_key?("output") && result["output"].is_a?(String)
result["output"] = [result["output"].as(String)]
end
if result.has_key?("event")
Event.new(result)
else
Response.new(result)
end
end
private def trigger_callback(event : Event)
name = event.event.to_s.downcase
@event_callbacks[name]?.try &.call(self, event)
end
private def close
return unless running?
@connected = false
@running = false
@fully_booted = false
@conn.close
@on_close.try &.call(self)
end
private def running?
@running
end
private def fully_booted?
@fully_booted
end
end
end