-
Notifications
You must be signed in to change notification settings - Fork 9
/
scene.py
588 lines (502 loc) · 24.9 KB
/
scene.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
import asyncio
import json
import os
import re
import sys
from datetime import datetime
from inspect import signature
import threading
from .arena_mqtt import ArenaMQTT
from .attributes import *
from .events import *
from .objects import *
from .utils import Utils, ArenaTelemetry, ArenaCmdInterpreter
from .env import (
SCENE,
PROGRAM_OBJECT_ID
)
class Scene(ArenaMQTT):
"""
Gives access to an ARENA scene.
Can create and execute various user-defined functions/tasks.
:param str host: Hostname of the ARENA webserver (required).
:param str realm: Reserved topic fork for future use (optional).
:param str namespace: Username of authenticated user or other namespace (automatic).
:param str scene: The name of the scene, without namespace (required).
"""
def __init__(
self,
host = "arenaxr.org",
realm = "realm",
network_latency_interval = 10000, # run network latency update every 10s
on_msg_callback = None,
new_obj_callback = None,
user_join_callback = None,
user_left_callback = None,
delete_obj_callback = None,
end_program_callback = None,
video = False,
debug = False,
cli_args = False,
**kwargs
):
# init telemetry
self.telemetry = ArenaTelemetry()
# setup event to let others wait on connection
self.connected_evt = threading.Event()
# start the command interpreter (if enabled by env variable)
self.cmd_interpreter = ArenaCmdInterpreter(self, show_attrs=('config_data', 'scene', 'users', 'all_objects', 'stats'),
get_callables=('persisted_objs', 'persisted_scene_option', 'writable_scenes', 'user_list'),
start_cmd_event=self.connected_evt)
if cli_args:
self.args = self.parse_cli()
if self.args["host"]:
kwargs["host"] = self.args["host"]
if self.args["namespace"]:
kwargs["namespace"] = self.args["namespace"]
if self.args["scene"]:
kwargs["scene"] = self.args["scene"]
if self.args["debug"]:
debug = self.args["debug"]
if os.environ.get(SCENE):
self.scene = os.environ[SCENE]
print(f"Using Scene from 'SCENE' env variable: {self.scene}")
elif "scene" in kwargs and kwargs["scene"]:
if re.search("/", kwargs["scene"]):
self.exit("Scene argument (scene) cannot include '/', aborting...")
self.scene = kwargs["scene"]
print(f"Using Scene from 'scene' input parameter: {self.scene}")
else:
self.exit("Scene argument (scene) is unspecified or None, aborting...")
super().__init__(
host,
realm,
network_latency_interval,
on_msg_callback,
end_program_callback,
video,
debug,
**kwargs
)
with self.telemetry.start_span(f"init {self.namespace}/{self.scene}") as span:
# 'init' span will track the remainder of the initialization
# create a program object to describe this program
# PROGRAM_OBJECT_ID allows to match the object id of persisted program object
# when a program object with PROGRAM_OBJECT_ID is loaded from persist, it will replace this one
self.program = Program(object_id=os.environ.get(PROGRAM_OBJECT_ID),
name=f"{self.namespace}/{self.scene}",
filename=sys.argv[0],
filetype="PY")
self.persist_host = self.config_data["ARENADefaults"]["persistHost"]
self.persist_path = self.config_data["ARENADefaults"]["persistPath"]
self.persist_url = f"https://{self.persist_host}{self.persist_path}{self.namespaced_target}"
# set up callbacks
self.new_obj_callback = new_obj_callback
self.delete_obj_callback = delete_obj_callback
self.user_join_callback = user_join_callback
self.user_left_callback = user_left_callback
self.unspecified_object_ids = set() # objects that exist in the scene,
# but this scene instance does not
# have a reference to
self.users = {} # dict of all users
# Always use the the hostname specified by the user, or defaults.
print(f"Loading: https://{self.web_host}/{self.namespace}/{self.scene}, realm={self.realm}")
span.add_event("Init Done.")
def exit(self, arg=0):
"""Custom exit to push errors to telemetry"""
if arg != 0:
error_msg = f"Exiting with sys.exit('{arg}')"
self.telemetry.set_error(error_msg)
sys.exit(arg)
def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if rc == 0:
# set event
self.connected_evt.set()
# create arena-py Objects from persist server
# no need to return anything here
self.get_persisted_objs()
async def process_message(self):
while True:
try:
msg = await self.msg_queue.get()
except RuntimeError as e:
print(f"Ignoring error: {e}")
return
# extract payload
try:
payload_str = msg.payload.decode("utf-8", "ignore")
payload = json.loads(payload_str)
except Exception as e:
print("Malformed payload, ignoring:")
print(e)
return
try:
# update object attributes, if possible
if "object_id" in payload:
# parse payload
object_id = payload.get("object_id", None)
action = payload.get("action", None)
with self.telemetry.start_process_msg_span(object_id, action) as span:
data = payload.get("data", {})
object_type = data.get("object_type", None)
event = None
# create/get object from object_id
if object_id in self.all_objects:
obj = self.all_objects[object_id]
else:
ObjClass = OBJECT_TYPE_MAP.get(object_type, Object)
obj = ObjClass(**payload)
# react to action accordingly
if action:
if action == "clientEvent":
event = Event(**payload)
if obj.evt_handler:
self.callback_wrapper(obj.evt_handler, event, payload)
continue
span.add_event("Client event: {event}")
elif action == "delete":
if Camera.object_type in object_id: # object is a camera
if object_id in self.users:
if self.user_left_callback:
self.callback_wrapper(
self.user_left_callback,
self.users[object_id],
payload
)
del self.users[object_id]
elif HandLeft.object_type in object_id or HandRight.object_type in object_id: # object is a hand/controller
if "dep" in obj.data:
user_id = obj.data.dep
if user_id in self.users:
user = self.users[user_id]
if obj in user.hands.values():
if user.hand_remove_callback:
self.callback_wrapper(
user.hand_remove_callback,
obj,
payload
)
hand_key = HandLeft.object_type if HandLeft.object_type in object_id else HandRight.object_type
del user.hands[hand_key]
elif self.delete_obj_callback:
self.callback_wrapper(self.delete_obj_callback, obj, payload)
Object.remove(obj)
span.add_event("Object delete.")
continue
else: # create/update
obj.update_attributes(**payload)
span.add_event("Object attributes update.")
# call new message callback for all messages
if self.on_msg_callback:
if not event:
self.callback_wrapper(self.on_msg_callback, obj, payload)
else:
self.callback_wrapper(self.on_msg_callback, event, payload)
if object_type:
# run user_join_callback when user is found
if object_type == Camera.object_type:
if object_id not in self.users:
if object_id in self.all_objects:
self.users[object_id] = obj
else:
self.users[object_id] = Camera(**payload)
if self.user_join_callback:
self.callback_wrapper(
self.user_join_callback,
self.users[object_id],
payload
)
elif object_type == HandLeft.object_type or object_type == HandRight.object_type:
if "dep" in obj.data:
user_id = obj.data.dep
if user_id in self.users:
user = self.users[user_id]
if obj not in user.hands.values():
user.hands[object_type] = obj
obj.camera = user
if user.hand_found_callback:
self.callback_wrapper(
user.hand_found_callback,
obj,
payload
)
# if its an object the library has not seen before, call new object callback
elif object_id not in self.unspecified_object_ids and self.new_obj_callback:
self.callback_wrapper(self.new_obj_callback, obj, payload)
self.unspecified_object_ids.add(object_id)
span.add_event("New Object.")
span.add_event("Handle Msg Done.")
except Exception as e:
print("Something went wrong, ignoring:")
print(e)
print(payload)
def callback_wrapper(self, func, arg, msg):
"""Checks for number of arguments for callback"""
if len(signature(func).parameters) != 3:
print("[DEPRECATED]", "Callbacks and handlers now take 3 arguments: (scene, obj/evt, msg)!")
func(arg)
else:
func(self, arg, msg)
def generate_custom_event(self, evt, action="clientEvent"):
"""Publishes an custom event. Could be user or library defined"""
return self._publish(evt, action)
def generate_click_event(self, obj, type="mousedown", **kwargs):
"""Publishes an click event"""
_type = type
evt = Event(object_id=obj.object_id,
type=_type,
position=obj.data.position,
source=self.mqttc_id,
**kwargs)
return self.generate_custom_event(evt, action="clientEvent")
def manipulate_camera(self, cam, **kwargs):
"""Publishes a camera manipulation event"""
if kwargs.get("position"):
if isinstance(kwargs["position"], (list, tuple)):
kwargs["position"] = Position(*kwargs["position"])
elif isinstance(kwargs["position"], dict):
kwargs["position"] = Position(**kwargs["position"])
if kwargs.get("rotation"):
if isinstance(kwargs["rotation"], (list, tuple)):
kwargs["rotation"] = Rotation(*kwargs["rotation"])
elif isinstance(kwargs["rotation"], dict):
kwargs["rotation"] = Rotation(**kwargs["rotation"])
if isinstance(cam, Object):
object_id = cam.object_id
else:
object_id = cam
evt = Event(object_id=object_id,
type="camera-override",
object_type=Camera.object_type,
**kwargs)
return self.generate_custom_event(evt, action="update")
def look_at(self, cam, target):
"""Publishes a camera manipulation event"""
if isinstance(target, tuple) or isinstance(target, list):
target = Position(*target)
elif isinstance(target, dict):
target = Position(**target)
elif isinstance(target, Object):
target = target.object_id
if isinstance(cam, Object):
object_id = cam.object_id
else:
object_id = cam
evt = Event(object_id=object_id,
type="camera-override",
object_type="look-at",
target=target)
return self.generate_custom_event(evt, action="update")
@property
def all_objects(self):
"""Returns all the objects in a scene"""
return Object.all_objects
def add_object(self, obj):
"""Public function to create an object"""
res = self._publish(obj, "create")
self.run_animations(obj)
return res
def add_objects(self, objs):
"""Public function to create multiple objects in a list"""
for obj in objs:
self.add_object(obj)
return len(objs)
def update_object(self, obj, **kwargs):
"""Public function to update an object"""
if kwargs:
obj.update_attributes(**kwargs)
# Check if any keys in delayed_prop_tasks are pending new animations
# and cancel corresponding final update tasks or, if they are in
# kwarg property updates, cancel the task as well as the animation
need_to_run_animations = False
if len(obj.delayed_prop_tasks) > 0:
for anim in obj.animations:
if anim.property in obj.delayed_prop_tasks:
task_to_cancel = obj.delayed_prop_tasks[anim.property]
task_to_cancel.cancel()
del task_to_cancel
for k in kwargs:
if str(k) in obj.delayed_prop_tasks:
need_to_run_animations = True
task_to_cancel = obj.delayed_prop_tasks[k]
task_to_cancel.cancel()
del task_to_cancel
obj.dispatch_animation(
Animation(property=k, enabled=False, dur=0)
)
if need_to_run_animations:
self.run_animations(obj)
res = self._publish(obj, "update")
self.run_animations(obj)
return res
def update_objects(self, objs, **kwargs):
"""Public function to update multiple objects in a list"""
for obj in objs:
self.update_object(obj, **kwargs)
return len(objs)
def delete_object(self, obj):
"""Public function to delete an object"""
payload = {
"object_id": obj.object_id
}
Object.remove(obj)
return self._publish(payload, "delete", custom_payload=True)
def delete_attributes(self, obj, attributes=None):
"""Public function to delete a list of 'attributes' as a string[], updating each to null"""
updated_data = {}
for attr in attributes:
obj.data[attr] = None # remove from large internal storage
updated_data[attr] = None # remove from small external publish
payload = {
"object_id": obj.object_id,
"type": obj.type,
"persist": obj.persist,
"data": updated_data # dashes handled from string array
}
return self._publish(payload, "update", custom_payload=True)
def run_animations(self, obj):
"""Runs all dispatched animations"""
if obj.animations:
payload = {
"object_id": obj.object_id,
"type": obj.type,
"data": {"object_type": obj.data.object_type}
}
for i,animation in enumerate(obj.animations):
if isinstance(animation, AnimationMixer):
payload["data"][f"animation-mixer"] = vars(animation)
else:
anim = vars(animation).copy()
payload["data"][f"animation__{anim['property']}"] = anim
Utils.dict_key_replace(anim, "start", "from")
Utils.dict_key_replace(anim, "end", "to")
self.create_delayed_task(obj, anim)
obj.clear_animations()
return self._publish(payload, "update", custom_payload=True)
def create_delayed_task(self, obj, anim):
"""
Creates a delayed task to push the end state of an animation after the expected
duration. Uses async sleep to avoid blocking.
:param obj: arena object to update
:param anim: Animation to run
:return: created async task
"""
async def _delayed_task():
try:
sleep_dur = (anim.get('dur', 0) + anim.get('delay', 0)) / 1000
await asyncio.sleep(sleep_dur) # this is in seconds
final_state = anim["from"] if anim.get("dir", "normal") == "reverse"\
else anim["to"]
obj.update_attributes(**{anim["property"]: final_state})
self.update_object(obj)
obj.delayed_prop_tasks.pop(anim["property"], None)
except asyncio.CancelledError:
print("Animation end task cancelled for",
obj.object_id + "." + anim["property"])
if anim.get("loop", 0) or anim.get("enabled", True) is False:
return None
delayed_task = asyncio.create_task(_delayed_task())
obj.delayed_prop_tasks[anim["property"]] = delayed_task
delayed_task.object_id = obj.object_id
return delayed_task
def _publish(self, obj, action, custom_payload=False):
"""Publishes to mqtt broker with "action":action"""
with self.telemetry.start_publish_span(obj.object_id, action, obj.type):
if not self.can_publish:
print(f"ERROR: Publish failed! You do not have permission to publish to topic {self.root_topic} on {self.web_host}")
topic = f"{self.root_topic}/{self.mqttc_id}/{obj['object_id']}"
d = datetime.utcnow().isoformat()[:-3]+"Z"
if custom_payload:
payload = obj
payload["action"] = action
payload["timestamp"] = d
payload = json.dumps(payload)
else:
payload = obj.json(action=action, timestamp=d)
self.mqttc.publish(topic, payload, qos=0)
if self.debug: print("[publish]", topic, payload)
return payload
def get_persisted_obj(self, object_id):
"""Returns a dictionary for a persisted object.
If object is known by arena-py, return local object, not persisted
"""
obj = None
if object_id in self.all_objects:
obj = self.all_objects[object_id]
obj.persist = True
else:
# pass token to persist
data = self.auth.urlopen(url=f"{self.persist_url}/{object_id}", creds=True)
persist_obj = json.loads(data)
if len(obj) > 0:
obj = obj[0]
obj_id = persist_obj.get("object_id", object_id)
data = persist_obj.get("attributes", {})
object_type = data.get("object_type")
# special case for Program type
if persist_obj.get("type") == Program.object_type: object_type = Program.object_type
if object_type != None:
obj_class = OBJECT_TYPE_MAP.get(object_type, Object)
obj = obj_class(object_id=obj_id, data=data)
obj.persist = True
return obj
def get_persisted_objs(self):
"""Returns a dictionary of persisted objects.
If object is known by arena-py, return our local object, not persisted
Silently fails/skip objects without object_id and object_type (except programs)
Instanciates generic Object if object_type is given but unknown to arena-py
"""
objs = {}
# pass token to persist
data = self.auth.urlopen(url=self.persist_url, creds=True)
output = json.loads(data)
for obj in output:
# note: no exception on missing fields
object_id = obj.get("object_id")
data = obj.get("attributes", {})
object_type = data.get("object_type")
# special case for Program type
if obj.get("type") == Program.object_type:
object_type = Program.object_type
if object_id != None:
if object_id in self.all_objects:
# note: object from our list (not from persist)
persisted_obj = self.all_objects[object_id]
persisted_obj.persist = True
elif object_type != None:
# note: instanciate even with empty attributes if object_type is unknown to arena-py
obj_class = OBJECT_TYPE_MAP.get(object_type, Object)
persisted_obj = obj_class(object_id=object_id, data=data)
persisted_obj.persist = True
# replace program object, if matches our program id
if object_type == Program.object_type:
if os.environ.get(PROGRAM_OBJECT_ID) == object_id:
self.program = persisted_obj
objs[object_id] = persisted_obj
return objs
def get_persisted_scene_option(self):
"""Returns a dictionary for scene-options. [TODO] wrap the output as a BaseObject"""
scene_opts_url = f"{self.persist_url}?type=scene-options"
# pass token to persist
data = self.auth.urlopen(url=scene_opts_url, creds=True )
output = json.loads(data)
return output
def get_writable_scenes(self):
""" Request list of scene names for logged in user account that user has publish permission for.
Returns: list of scenes.
"""
return self.auth.get_writable_scenes(web_host=self.web_host)
def get_user_list(self):
"""Returns a list of users"""
return self.users.values()
def run_info_update(self, run_info):
"""Callbak when program stats are updated; publish program object update"""
# Add run info to program data object and publish program object update
run_info.add_program_info(self.program.data)
self.program.persist = True
self._publish(self.program, "update")
class Arena(Scene):
"""
Another name for Scene.
"""