/
_publisher.py
241 lines (203 loc) · 9.96 KB
/
_publisher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# Copyright (c) 2019 OpenCyphal
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko <pavel@opencyphal.org>
from __future__ import annotations
import typing
import logging
import asyncio
import pycyphal.util
import pycyphal.transport
from ._base import MessagePort, OutgoingTransferIDCounter, T, Closable
from ._base import DEFAULT_PRIORITY, PortFinalizer
from ._error import PortClosedError
_logger = logging.getLogger(__name__)
class Publisher(MessagePort[T]):
"""
A task should request its own independent publisher instance from the presentation layer controller.
Do not share the same publisher instance across different tasks. This class implements the RAII pattern.
Implementation info: all publishers sharing the same session specifier (i.e., subject-ID) also share the same
underlying implementation object containing the transport session which is reference counted and destroyed
automatically when the last publisher with that session specifier is closed;
the user code cannot access it and generally shouldn't care.
None of the settings of a publisher instance, such as send timeout or priority, can affect other publishers;
this does not apply to the transfer-ID counter objects though because they are transport-layer entities
and therefore are shared per session specifier.
"""
DEFAULT_SEND_TIMEOUT = 1.0
"""
Default value for :attr:`send_timeout`. The value is an implementation detail, not required by Specification.
"""
def __init__(self, impl: PublisherImpl[T]):
"""
Do not call this directly! Use :meth:`Presentation.make_publisher`.
"""
self._maybe_impl: typing.Optional[PublisherImpl[T]] = impl
impl.register_proxy() # Register ASAP to ensure correct finalization.
self._dtype = impl.dtype # Permit usage after close()
self._transport_session = impl.transport_session # Same
self._transfer_id_counter = impl.transfer_id_counter # Same
self._priority: pycyphal.transport.Priority = DEFAULT_PRIORITY
self._send_timeout = self.DEFAULT_SEND_TIMEOUT
@property
def dtype(self) -> typing.Type[T]:
return self._dtype
@property
def transport_session(self) -> pycyphal.transport.OutputSession:
return self._transport_session
@property
def transfer_id_counter(self) -> OutgoingTransferIDCounter:
"""
Allows the caller to reach the transfer-ID counter object of this session (shared per session specifier).
This may be useful in certain special cases such as publication of time synchronization messages,
where it may be necessary to override the transfer-ID manually.
"""
return self._transfer_id_counter
@property
def priority(self) -> pycyphal.transport.Priority:
"""
The priority level used for transfers published via this instance.
This parameter is configured separately per proxy instance; i.e., it is not shared across different publisher
instances under the same session specifier.
"""
return self._priority
@priority.setter
def priority(self, value: pycyphal.transport.Priority) -> None:
assert value in pycyphal.transport.Priority
self._priority = value
@property
def send_timeout(self) -> float:
"""
Every outgoing transfer initiated via this proxy instance will have to be sent in this amount of time.
If the time is exceeded, the attempt is aborted and False is returned. Read the transport layer docs for
an in-depth information on send timeout handling.
The default is :attr:`DEFAULT_SEND_TIMEOUT`.
The publication logic is roughly as follows::
return transport_session.send(message_transfer, self.loop.time() + self.send_timeout)
"""
return self._send_timeout
@send_timeout.setter
def send_timeout(self, value: float) -> None:
value = float(value)
if 0 < value < float("+inf"):
self._send_timeout = value
else:
raise ValueError(f"Invalid send timeout value: {value}")
async def publish(self, message: T) -> bool:
"""
Serializes and publishes the message object at the priority level selected earlier.
Should not be used simultaneously with :meth:`publish_soon` because that makes the message ordering undefined.
Returns False if the publication could not be completed in :attr:`send_timeout`, True otherwise.
"""
self._require_usable()
loop = asyncio.get_running_loop()
assert self._maybe_impl
return await self._maybe_impl.publish(message, self._priority, loop.time() + self._send_timeout)
def publish_soon(self, message: T) -> None:
"""
Serializes and publishes the message object at the priority level selected earlier.
Does so without blocking (observe that this method is not async).
Should not be used simultaneously with :meth:`publish` because that makes the message ordering undefined.
The send timeout is still in effect here -- if the operation cannot complete in the selected time,
send will be cancelled and a low-severity log message will be emitted.
"""
async def executor() -> None:
try:
if not await self.publish(message):
_logger.info("%s send timeout", self)
except Exception as ex:
if self._maybe_impl is not None:
_logger.exception("%s deferred publication has failed: %s", self, ex)
else:
_logger.debug(
"%s deferred publication has failed but the publisher is already closed", self, exc_info=True
)
self._require_usable() # Detect errors as early as possible, do not wait for the task to start.
asyncio.ensure_future(executor())
def close(self) -> None:
impl, self._maybe_impl = self._maybe_impl, None
if impl is not None:
impl.remove_proxy()
def _require_usable(self) -> None:
if self._maybe_impl is None or not self._maybe_impl.up:
raise PortClosedError(repr(self))
def __del__(self) -> None:
if self._maybe_impl is not None:
# https://docs.python.org/3/reference/datamodel.html#object.__del__
# DO NOT invoke logging from the finalizer because it may resurrect the object!
# Once it is resurrected, we may run into resource management issue if __del__() is invoked again.
# Whether it is invoked the second time is an implementation detail.
self._maybe_impl.remove_proxy()
self._maybe_impl = None
class PublisherImpl(Closable, typing.Generic[T]):
"""
The publisher implementation. There is at most one such implementation per session specifier. It may be shared
across multiple users with the help of the proxy class. When the last proxy is closed or garbage collected,
the implementation will also be closed and removed. This is not a part of the library API.
"""
def __init__(
self,
dtype: typing.Type[T],
transport_session: pycyphal.transport.OutputSession,
transfer_id_counter: OutgoingTransferIDCounter,
finalizer: PortFinalizer,
):
import nunavut_support
assert nunavut_support.is_message_type(dtype)
self.dtype = dtype
self.transport_session = transport_session
self.transfer_id_counter = transfer_id_counter
self._maybe_finalizer: typing.Optional[PortFinalizer] = finalizer
self._lock = asyncio.Lock()
self._proxy_count = 0
self._underlying_session_closed = False
async def publish(self, message: T, priority: pycyphal.transport.Priority, monotonic_deadline: float) -> bool:
import nunavut_support
if not isinstance(message, self.dtype):
raise TypeError(f"Expected a message object of type {self.dtype}, found this: {message}")
async with self._lock:
if not self.up:
raise PortClosedError(repr(self))
timestamp = pycyphal.transport.Timestamp.now()
fragmented_payload = list(nunavut_support.serialize(message))
transfer = pycyphal.transport.Transfer(
timestamp=timestamp,
priority=priority,
transfer_id=self.transfer_id_counter.get_then_increment(),
fragmented_payload=fragmented_payload,
)
try:
return await self.transport_session.send(transfer, monotonic_deadline)
except pycyphal.transport.ResourceClosedError:
self._underlying_session_closed = True
raise
def register_proxy(self) -> None:
self._proxy_count += 1
_logger.debug("%s got a new proxy, new count %s", self, self._proxy_count)
assert self.up, "Internal protocol violation"
assert self._proxy_count >= 1
def remove_proxy(self) -> None:
self._proxy_count -= 1
_logger.debug("%s has lost a proxy, new count %s", self, self._proxy_count)
if self._proxy_count <= 0:
self.close() # RAII auto-close
assert self._proxy_count >= 0
@property
def proxy_count(self) -> int:
"""Testing facilitation."""
assert self._proxy_count >= 0
return self._proxy_count
def close(self) -> None:
if self._maybe_finalizer is not None:
self._maybe_finalizer([self.transport_session])
self._maybe_finalizer = None
@property
def up(self) -> bool:
return self._maybe_finalizer is not None and not self._underlying_session_closed
def __repr__(self) -> str:
import nunavut_support
return pycyphal.util.repr_attributes_noexcept(
self,
dtype=str(nunavut_support.get_model(self.dtype)),
transport_session=self.transport_session,
proxy_count=self._proxy_count,
)