-
Notifications
You must be signed in to change notification settings - Fork 9
/
client.py
361 lines (301 loc) · 15.7 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
# Buttplug Python
# Client Module
# Copyright 2019 Nonpolynomial
# 3-Clause BSD Licensed
from .connector import (ButtplugClientConnector,
ButtplugClientConnectorObserver,
ButtplugClientConnectorError)
from ..core import (ButtplugMessage, StartScanning, StopScanning, Ok,
RequestServerInfo, Error, ServerInfo,
ButtplugMessageError, RequestLog, DeviceAdded,
DeviceList, DeviceRemoved, ScanningFinished, DeviceInfo,
MessageAttributes, VibrateCmd, SpeedSubcommand,
RequestDeviceList, RotateSubcommand, LinearSubcommand,
RotateCmd, LinearCmd, StopDeviceCmd, ButtplugErrorCode,
ButtplugError, Log, ButtplugDeviceError,
ButtplugHandshakeError, ButtplugPingError,
ButtplugUnknownError)
from ..utils import EventHandler
from typing import Dict, List, Tuple, Union
from asyncio import Future, get_event_loop
import logging, sys
class ButtplugClient(ButtplugClientConnectorObserver):
"""Used to connect to Buttplug Servers.
Attributes:
name (string):
name of the client, which the server can use to show
with connection status.
devices (Dict[int, ButtplugClientDevice]):
dict of devices currently connected to the Buttplug Server, indexed
by their server-provisioned numerical index.
device_added_handler (buttplug.utils.EventHandler):
Takes functions of the format f(a: ButtplugClientDevice) -> void.
Calls handlers whenever a new device is found by the Buttplug
Server.
device_removed_handler (buttplug.utils.EventHandler):
Takes functions of the format f(a: ButtplugClientDevice) -> void.
Calls handlers whenever a device has disconnected from the Buttplug
server.
scanning_finished_handler (buttplug.utils.EventHandler):
Takes functions of the format f() -> void. Calls handlers whenever
the server has finished scanning for devices.
log_handler (buttplug.utils.EventHandler):
Takes functions of the format f(a: Log) -> void. Calls handlers
whenever a new log message is received.
"""
def __init__(self, name: str):
self.name: str = name
self.connector: ButtplugClientConnector = None
self.devices: Dict[int, ButtplugClientDevice] = {}
self.scanning_finished_handler: EventHandler = EventHandler(self)
self.device_added_handler: EventHandler = EventHandler(self)
self.device_removed_handler: EventHandler = EventHandler(self)
self.log_handler: EventHandler = EventHandler(self)
self._msg_tasks: Dict[int, Future] = {}
self._msg_counter: int = 1
async def connect(self, connector):
"""Connects to a Buttplug Server, using the connector passed to it.
Asynchronous function that connects to a Buttplug Server.
Args:
connector (ButtplugConnector):
Connector to use to contact the server.
Returns:
void:
Should just return on successful connect.
Raises:
buttplug.client.ButtplugClientConnectorError:
On failed connect. Check message for context.
"""
self.connector = connector
self.connector.add_observer(self)
await self.connector.connect()
await self._init()
async def _init(self):
initmsg = RequestServerInfo(self.name)
msg: ServerInfo = await self._send_message_expect_reply(initmsg,
ServerInfo)
logging.info("Connected to server: " + msg.server_name)
dl: DeviceList = await self._send_message_expect_reply(RequestDeviceList(),
DeviceList)
self._handle_device_list(dl)
def _handle_device_list(self, dl: DeviceList):
for dev in dl.devices:
self.devices[dev.device_index] = ButtplugClientDevice(self, dev)
self.device_added_handler(self.devices[dev.device_index])
async def disconnect(self):
"""Disconnect from the remote server.
"""
if not self.connector.connected:
return
await self.connector.disconnect()
async def start_scanning(self):
"""Request that the server starts scanning for devices.
"""
await self._send_message_expect_ok(StartScanning())
async def stop_scanning(self):
"""Request that the server stops scanning for devices.
"""
await self._send_message_expect_ok(StopScanning())
async def request_log(self, log_level: str):
"""Request that the server sends logs at the requested level or higher to the
client.
To stop logs from being sent, call request_log again with the "Off"
level.
Args:
log_level (string):
Log level to receive. Send "Off" to stop logs from being sent.
"""
await self._send_message_expect_ok(RequestLog(log_level))
async def _send_message(self, msg: ButtplugMessage):
msg.id = self._msg_counter
self._msg_counter += 1
await self.connector.send(msg)
async def _parse_message(self, msg: ButtplugMessage):
if isinstance(msg, DeviceAdded):
da: DeviceAdded = msg
self.devices[da.device_index] = ButtplugClientDevice(self, da)
self.device_added_handler(self.devices[da.device_index])
elif isinstance(msg, DeviceRemoved):
dr: DeviceRemoved = msg
removed_device: ButtplugClientDevice = self.devices[dr.device_index]
self.devices.pop(dr.device_index)
self.device_removed_handler(removed_device)
elif isinstance(msg, ScanningFinished):
self.scanning_finished_handler()
elif isinstance(msg, Log):
self.log_handler(Log)
# What kinda typing should expectedClass be here? Could we make this a
# generic function?
async def _send_message_expect_reply(self,
msg: ButtplugMessage,
expectedClass) -> ButtplugMessage:
if not self.connector.connected:
raise ButtplugClientConnectorError("Client not connected to server")
f = get_event_loop().create_future()
await self._send_message(msg)
self._msg_tasks[msg.id] = f
retmsg = await f
if not isinstance(retmsg, expectedClass):
if isinstance(retmsg, Error):
# This will always throw
self._throw_error_msg_exception(retmsg)
raise ButtplugMessageError("Unexpected message" + retmsg)
return retmsg
async def _send_message_expect_ok(self, msg: ButtplugMessage) -> None:
await self._send_message_expect_reply(msg, Ok)
async def _handle_message(self, msg: ButtplugMessage):
if msg.id in self._msg_tasks.keys():
self._msg_tasks[msg.id].set_result(msg)
return
await self._parse_message(msg)
def _throw_error_msg_exception(self, msg: Error):
if msg.error_code == ButtplugErrorCode.ERROR_UNKNOWN:
raise ButtplugUnknownError(msg)
elif msg.error_code == ButtplugErrorCode.ERROR_DEVICE:
raise ButtplugDeviceError(msg)
elif msg.error_code == ButtplugErrorCode.ERROR_MSG:
raise ButtplugMessageError(msg)
elif msg.error_code == ButtplugErrorCode.ERROR_PING:
raise ButtplugPingError(msg)
elif msg.error_code == ButtplugErrorCode.ERROR_INIT:
raise ButtplugHandshakeError(msg)
raise ButtplugError(msg)
class ButtplugClientDevice(object):
"""Represents a device that is connected to the Buttplug Server.
Attributes:
name (string):
Name of the device
allowed_messages (Dict[str, MessageAttributes]):
Dictionary that matches message names to attributes. For instance,
if a device can vibrate, it will have a dictionary entry for
"VibrateCmd", as well as a MessageAttribute for "FeatureCount" that
says how many vibrators are in the device.
"""
def __init__(self, client: ButtplugClient, device_msg: Union[DeviceInfo,
DeviceAdded]):
self._client = client
if isinstance(device_msg, DeviceInfo):
device_info: DeviceInfo = device_msg
self.name = device_info.device_name
self._index = device_info.device_index
self.allowed_messages: Dict[str, MessageAttributes] = {}
for (msg_name, attrs) in device_info.device_messages.items():
self.allowed_messages[msg_name] = MessageAttributes(attrs.get("FeatureCount"))
elif isinstance(device_msg, DeviceAdded):
device_info: DeviceAdded = device_msg
self.name = device_info.device_name
self._index = device_info.device_index
self.allowed_messages: Dict[str, MessageAttributes] = {}
logging.debug(device_info.device_messages)
for (msg_name, attrs) in device_info.device_messages.items():
self.allowed_messages[msg_name] = MessageAttributes(attrs.get("FeatureCount"))
else:
raise ButtplugDeviceError(
"Cannot create device from message {}".format(device_msg.__name__))
async def send_vibrate_cmd(self, speeds: Union[float,
List[float],
Dict[int, float]]):
"""Tell the server to make a device vibrate at a certain speed. 0.0 for speed
or using send_stop_device_cmd will stop the hardware from vibrating.
Args:
speeds (Union[float, List[float], Dict[int, float]]):
Speed, or speeds, to set the vibrators to, assuming the
hardware supports vibration. Range is from 0.0 <= x <= 1.0.
Types accepted:
- a single float, which all vibration motors will be set to
- a list of floats, mapping to the motor indexes in the
hardware, i.e. [0.5, 1.0] will set motor 0 to 0.5, motor 1 to
1.
- a dict of int to float, which maps motor index to speed. i.e.
{ 0: 0.5, 1: 1.0 } will set motor 0 to 0.5, motor 1 to 1.
"""
if "VibrateCmd" not in self.allowed_messages.keys():
raise ButtplugDeviceError("VibrateCmd not supported by device")
speeds_obj = []
if isinstance(speeds, (float, int)):
speeds_obj = [SpeedSubcommand(0, speeds)]
elif isinstance(speeds, list):
speeds_obj = [SpeedSubcommand(x, speed)
for x, speed in enumerate(speeds)]
elif isinstance(speeds, dict):
speeds_obj = [SpeedSubcommand(x, speed)
for x, speed in speeds.items()]
msg = VibrateCmd(self._index,
speeds_obj)
await self._client._send_message_expect_ok(msg)
async def send_rotate_cmd(self, rotations: Union[Tuple[float, bool],
List[Tuple[float, bool]],
Dict[int, Tuple[float, bool]]]):
"""Tell the server to make a device rotate at a certain speed. 0.0 for speed or
using send_stop_device_cmd will stop the hardware from rotating.
Args:
rotations (Union[Tuple[float, bool], List[Tuple[float, bool]], Dict[int, Tuple[float, bool]]]):
Rotation speed(s) and directions, to set the hardware to,
assuming the hardware supports rotation.. Range is from 0.0 <=
x <= 1.0 for speeds. For bool, True is clockwise direction,
False is counterclockwise.
Types accepted:
- a Tuple of [float, bool], which all rotators will be set to
- a list of Tuple[float, bool], mapping to the rotator indexes
in the hardware, i.e. [(0.5, False), (1.0, True)] will set
motor 0 to 50% speed going counterclockwise, motor 1 to 100%
speed going clockwise.
- a dict of int to Tuple[float, bool], mapping rotator indexes
in the hardware, i.e. { 0: (0.5, False), 1: (1.0, True)} will
set motor 0 to 50% speed going counterclockwise, motor 1 to
100% speed going clockwise.
"""
if "RotateCmd" not in self.allowed_messages.keys():
raise ButtplugDeviceError("RotateCmd not supported by device")
rotations_obj = []
if isinstance(rotations, tuple):
rotations_obj = [RotateSubcommand(0, rotations[0], rotations[1])]
elif isinstance(rotations, list):
rotations_obj = [RotateSubcommand(x, rot[0], rot[1])
for x, rot in enumerate(rotations)]
elif isinstance(rotations, dict):
rotations_obj = [RotateSubcommand(x, rot[0], rot[1])
for x, rot in rotations.items()]
msg = RotateCmd(self._index,
rotations_obj)
await self._client._send_message_expect_ok(msg)
async def send_linear_cmd(self, linear: Union[Tuple[int, float],
List[Tuple[int, float]],
Dict[int, Tuple[int, float]]]):
"""Tell the server to make a device stroke (move linearly) at a certain speed.
Use StopDeviceCmd to stop the device from moving.
Args:
linear (Union[Tuple[int, float], List[Tuple[int, float]], Dict[int, Tuple[int, float]]]):
Linear position(s) and movement duration(s), to set the
hardware to, assuming the hardware supports linear movement.
Position range is from 0.0 <= x <= 1.0. Duration is in
milliseconds, 1000ms = 1s.
Types accepted:
- a Tuple of [int, float], which all linear hardware is set to.
- a list of Tuple[int, float], mapping to the linear indexes in
the hardware, i.e. [(1000, 0.9), (500, 0.1)] will set linear
movement 0 to 90% position and move to it over 1s, while
linear movement 1 will move to 10% position over 0.5s
- a dict of Tuple[int, float], mapping to the linear indexes in
the hardware, i.e. {0: (1000, 0.9), 1: (500, 0.1)} will set
linear movement 0 to 90% position and move to it over 1s,
while linear movement 1 will move to 10% position over 0.5s
"""
if "LinearCmd" not in self.allowed_messages.keys():
raise ButtplugDeviceError("LinearCmd not supported by device")
linear_obj = []
if isinstance(linear, tuple):
linear_obj = [LinearSubcommand(0, linear[0], linear[1])]
elif isinstance(linear, list):
linear_obj = [LinearSubcommand(x, l[0], l[1])
for x, l in enumerate(linear)]
elif isinstance(linear, dict):
linear_obj = [LinearSubcommand(x, l[0], l[1])
for x, l in linear.items()]
msg = LinearCmd(self._index,
linear_obj)
await self._client._send_message_expect_ok(msg)
async def send_stop_device_cmd(self):
"""Tell the server to stop whatever device movements may be happening.
"""
await self._client._send_message_expect_ok(StopDeviceCmd(self._index))