-
Notifications
You must be signed in to change notification settings - Fork 50
/
utils.py
208 lines (157 loc) · 5.68 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import re
import uuid
try:
from collections.abc import MutableMapping
except ImportError:
from collections import MutableMapping
from configparser import ConfigParser
EOL = '\r\n'
re_code = re.compile(r'(?P<code>^\d*)\s*(?P<response>.*)')
re_kv = re.compile(r'(?P<key>\w+)=(?P<value>[^\s]+)\s*(?:\((?P<data>.*)\))*')
def parse_agi_result(line):
"""Parse AGI results using Regular expression.
AGI Result examples::
200 result=0
200 result=-1
200 result=132456
200 result= (timeout)
510 Invalid or unknown command
520-Invalid command syntax. Proper usage follows:
int() argument must be a string, a bytes-like object or a number, not
'NoneType'
HANGUP
"""
# print("--------------\n", line)
if line == 'HANGUP':
return {'error': 'AGIResultHangup',
'msg': 'User hungup during execution'}
kwargs = dict(code=0, response="", line=line)
m = re_code.search(line)
try:
kwargs.update(m.groupdict())
except AttributeError:
# None has no attribute groupdict
pass
return agi_code_check(**kwargs)
def agi_code_check(code=None, response=None, line=None):
"""
Check the AGI code and return a dict to help on error handling.
"""
code = int(code)
response = response or ""
result = {'status_code': code, 'result': ('', ''), 'msg': ''}
if code == 200:
for key, value, data in re_kv.findall(response):
result[key] = (value, data)
# If user hangs up... we get 'hangup' in the data
if data == 'hangup':
return {
'error': 'AGIResultHangup',
'msg': 'User hungup during execution'}
elif key == 'result' and value == '-1':
return {
'error': 'AGIAppError',
'msg': 'Error executing application, or hangup'}
elif code == 510:
result['error'] = 'AGIInvalidCommand'
elif code == 520:
# AGI Usage error
result['error'] = 'AGIUsageError'
result['msg'] = line
else:
# Unhandled code or undefined response
result['error'] = 'AGIUnknownError'
return result
class IdGenerator:
"""Generate some uuid for actions:
.. code-block:: python
>>> g = IdGenerator('mycounter')
..
>>> IdGenerator.reset(uid='an_uuid4')
It increments the counter at each calls:
.. code-block:: python
>>> print(g())
mycounter/an_uuid4/1/1
>>> print(g())
mycounter/an_uuid4/1/2
"""
instances = []
def __init__(self, prefix):
self.instances.append(self)
self.prefix = prefix
self.uid = str(uuid.uuid4())
self.generator = self.get_generator()
def get_generator(self):
i = 0
max_val = 10000
while True:
yield "%s/%s/%d/%d" % (self.prefix,
self.uid, (i // max_val) + 1,
(i % max_val) + 1)
i += 1
@classmethod
def reset(cls, uid=None):
"""Mostly used for unit testing. Allow to use a static uuid and reset
all counter"""
for instance in cls.instances:
if uid:
instance.uid = uid
instance.generator = instance.get_generator()
def get_instances(self):
"""Mostly used for debugging"""
return ["<%s prefix:%s (uid:%s)>" % (self.__class__.__name__,
i.prefix, self.uid)
for i in self.instances]
def __call__(self):
return next(self.generator)
def __repr__(self):
return "<%s prefix:%s (uid:%s)>" % (self.__class__.__name__, self.prefix, self.uid)
class CaseInsensitiveDict(MutableMapping):
"""
A case-insensitive ``dict``-like object.
Implements all methods and operations of ``collections.MutableMapping``.
All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``,
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
will contain case-sensitive keys. However, querying and contains
testing is case insensitive:
.. code-block:: python
cid = CaseInsensitiveDict()
cid['Action'] = 'SIPnotify'
cid['aCTION'] == 'SIPnotify' # True
list(cid) == ['Action'] # True
For example, ``event['actionid']`` will return the
value of a ``'ActionID'`` response event, regardless
of how the event name was originally stored.
"""
def __init__(self, data=None, **kwargs):
self._store = dict()
self.update(data or {}, **kwargs)
def __setitem__(self, key, value):
# Use the lowercased key for lookups, but store the actual
# key alongside the value.
self._store[key.lower()] = (key, value)
def __contains__(self, key):
return key.lower() in self._store
def __getattr__(self, attr):
return self.get(attr, '')
def __getitem__(self, key):
return self._store[key.lower()][1]
def __delitem__(self, key):
raise NotImplementedError()
def __iter__(self):
return (key for key, value in self._store.values())
def __len__(self):
return len(self._store)
def __repr__(self):
return str(dict(self.items()))
def config(filename_or_fd, section='asterisk'):
config = ConfigParser()
if hasattr(filename_or_fd, 'read'):
if hasattr(config, 'read_file'):
config.read_file(filename_or_fd)
else:
config.readfp(filename_or_fd)
else:
config.read(filename_or_fd)
return dict(config.items(section))