/
service.py.jinja2
139 lines (120 loc) · 4.75 KB
/
service.py.jinja2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
{% if is_first_by_category %}
##
## service types (aka "APIs")
##
import abc
from autobahn.wamp.types import PublishOptions, SubscribeOptions
from autobahn.wamp.request import Publication
from autobahn.wamp.interfaces import ISession
from autobahn.xbr import IDelegate
{% endif %}
class {{ metadata.classname }}(object):
"""
{{ metadata.docs }}
@interface: "{{ metadata.attrs.uuid }}"
"""
__slots__ = [
'log',
'_x_api_id',
'_x_session',
'_x_delegate',
]
def __init__(self, log=None):
"""
:param log: If provided, log to this logger, else create a new one internally.
"""
if log:
self.log = log
else:
import txaio
self.log = txaio.make_logger()
self._x_api_id = uuid.UUID('{{ metadata.attrs.uuid }}').bytes
self._x_session = None
self._x_delegate = None
def session(self) -> Optional[ISession]:
"""
WAMP session this API is attached to.
"""
return self._x_session
def delegate(self) -> Optional[IDelegate]:
"""
XBR (buyer/seller) delegate this API is attached to.
"""
return self._x_delegate
def prefix(self) -> Optional[str]:
"""
WAMP URI prefix under which this API is instantiated.
"""
return self._x_prefix
def is_attched(self) -> bool:
"""
Flag indicating whether this API instance is currently attached to a session/delegate.
"""
if self._x_session is None and self._x_seller is None and self._x_prefix is None:
return False
elif self._x_session is not None and self._x_seller is not None and self._x_prefix is not None:
return True
else:
assert False, 'logic error - should not arrive here'
async def attach(self, session: ISession, delegate: IDelegate, prefix: str):
"""
Attach this API instance with the given session and delegate, and under the given WAMP URI prefix.
:param session: WAMP session this API instance is attached to.
:param delegate: XBR (buyer/seller) delegate used by this API instance.
:param prefix: WAMP URI prefix under which this API instance is attached to.
"""
assert self._x_session is None
assert self._x_seller is None
assert self._x_prefix is None
self._x_session = session
self._x_delegate = delegate
self._x_prefix = prefix
{% for call_name in metadata.calls_by_id %}
{% if metadata.calls[call_name].attrs['type'] == 'topic' %}
async def do_receive_{{ call_name }}(key_id, enc_ser, ciphertext, details=None):
print('Received event {}, encrypted with key_id={}'.format(details.publication, uuid.UUID(bytes=key_id)))
try:
payload = await self._x_delegate.unwrap(key_id, enc_ser, ciphertext)
obj = HomeDevice.parse(payload)
except:
self.log.failure()
else:
print('Unencrypted event payload: {}'.format(pformat(obj)))
self.receive_{{ call_name }}(obj)
topic = '{}.{{ call_name }}'.format(self._x_prefix)
await self.subscribe(do_receive_{{ call_name }}, topic, options=SubscribeOptions(details=True))
{% endif %}
{% endfor %}
async def detach(self):
"""
Detach this API instance from the session and delegate.
"""
assert self._x_session is not None
assert self._x_delegate is not None
assert self._x_prefix is not None
if self._x_session.is_attached():
await self._x_session.leave()
self._x_session = None
self._x_delegate = None
self._x_prefix = None
{% for call_name in metadata.calls_by_id %}
{% if metadata.calls[call_name].attrs['type'] == 'topic' %}
async def publish_{{ call_name }}(self, device: HomeDevice) -> Publication:
"""
Publish - {{ metadata.calls[call_name].docs }}
"""
assert self._x_session is not None and self._x_session.is_attached()
assert self._x_seller is not None
topic = '{}.{{ call_name }}'.format(self._x_prefix)
payload = device.marshal()
key_id, enc_ser, ciphertext = await self._x_delegate.wrap(self._x_api_id, topic, payload)
pub = await self._x_session.publish(topic, key_id, enc_ser, ciphertext,
options=PublishOptions(acknowledge=True))
return pub
def receive_{{ call_name }}(self, device: HomeDevice):
"""
Receive - {{ metadata.calls[call_name].docs }}
"""
raise NotImplementedError('event handler for "{{ call_name }}" not implemented')
{% endif %}
{% endfor %}