-
Notifications
You must be signed in to change notification settings - Fork 0
/
gammu_controller.py
147 lines (111 loc) · 5.5 KB
/
gammu_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
import threading
from collections import namedtuple
from gammu import StateMachine
from gammu.worker import GammuWorker
from pyphone.controllers.controller import Controller
IncomingCallEvent = namedtuple("IncomingCallEvent", ["number"])
EndCallEvent = namedtuple("EndCallEvent", ["number"])
class GammuController(Controller):
_log = logging.getLogger(__name__)
_system_callbacks = {}
_user_callbacks = {}
connecting = threading.Event()
connected = threading.Event()
ongoing_call = threading.Event()
def __init__(self, root_panel):
super().__init__(root_panel)
self._system_callbacks = {"Init": self._on_init_complete,
"DialVoice": self._on_dial_voice_complete,
"AnswerCall": self._on_answer_call_complete}
self._state_machine = StateMachine()
self._gammu_worker = GammuWorker(self._on_gammu_result)
self._reconnect_thread = threading.Thread(target=self._reconnect_worker, daemon=True)
self._reconnect_thread.start()
def _reconnect_worker(self):
running = True
while running:
if not self.connecting.isSet():
if not self.connected.isSet():
self.try_connect()
else:
# necessary for incoming notifications to work correctly
self.enqueue_command("ReadDevice")
running = not self.stopped.wait(2)
def try_connect(self):
self.connecting.set()
if self._gammu_worker._thread is not None:
self._gammu_worker.terminate()
self._state_machine.ReadConfig()
self._gammu_worker.configure(self._state_machine.GetConfig())
self._log.debug("connecting...")
self._gammu_worker.initiate()
def cleanup(self):
super().cleanup()
self._disconnect()
def enqueue_command(self, command, params=None, callback=None):
self._log.debug("-> [command={}] [params={}]".format(command, params))
if not self.connected.wait(timeout=5):
if callback is not None:
self._log.error("<- ERR_NOTCONNECTED: ({} {})".format(command, params))
callback(command, None, "ERR_NOTCONNECTED", 100)
return
if callback is not None:
self._user_callbacks[command] = callback
self._gammu_worker.enqueue_command(command, params)
def bind(self, event, callback):
self._user_callbacks[event] = callback
def _disconnect(self):
try:
if self._gammu_worker._thread is not None:
self._log.info("disconnecting...")
self._gammu_worker.terminate()
except:
pass
self.connected.clear()
def _on_gammu_result(self, name, result, error, percents):
if self.stopped.isSet():
return
if error == "ERR_NONE":
error = None
self._log.debug("<- {0} ({1:d}%): \"{2}\"".format(name, percents, result if error is None else error))
if name in self._system_callbacks.keys():
self._system_callbacks[name](name, result, error, percents)
if name in self._user_callbacks.keys():
self._user_callbacks[name](name, result, error, percents)
if (error is not None) and (error.startswith("ERR_DEVICE")):
self._disconnect()
def _on_init_complete(self, name, result, error, percents):
if error is None:
self._log.info("connected!")
self.connected.set()
self.connecting.clear()
self._gammu_worker.enqueue_command("SetIncomingCall", (True,))
self._gammu_worker.enqueue_command("SetIncomingCallback", (self._on_incoming_call,))
self._gammu_worker.enqueue_command("SetIncomingSMS", (True,))
self._gammu_worker.enqueue_command("SetIncomingCallback", (self._on_incoming_sms,))
self._gammu_worker.enqueue_command("SetIncomingUSSD", (True,))
self._gammu_worker.enqueue_command("SetIncomingCallback", (self._on_incoming_ussd,))
else:
self._log.error("connection failed: {}".format(error))
self.connecting.clear()
def _on_dial_voice_complete(self, name, result, error, percents):
if (not self.ongoing_call.isSet()) and (error is None):
self.ongoing_call.set()
def _on_answer_call_complete(self, name, result, error, percents):
if (self.ongoing_call.isSet()) and (error is not None):
self.ongoing_call.clear()
def _on_incoming_call(self, state_machine, event_type, data):
self._log.debug("<- [event_type={}] [data={}]".format(event_type, data))
if (data["Status"] == "IncomingCall") and (not self.ongoing_call.isSet()):
self.ongoing_call.set()
if IncomingCallEvent in self._user_callbacks.keys():
self._user_callbacks[IncomingCallEvent](IncomingCallEvent(data["Number"]))
elif ((data["Status"] == "CallEnd") or (data["Status"] == "CallLocalEnd")) and self.ongoing_call.isSet():
self.ongoing_call.clear()
if EndCallEvent in self._user_callbacks.keys():
self._user_callbacks[EndCallEvent](EndCallEvent(data["Number"]))
def _on_incoming_sms(self, state_machine, event_type, data):
self._log.debug("<- [event_type={}] [data={}]".format(event_type, data))
def _on_incoming_ussd(self, state_machine, event_type, data):
self._log.debug("<- [event_type={}] [data={}]".format(event_type, data))