Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added VariableRateCyclicTaskABC and updated ThreadBasedCyclicSendTask #1733

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
147 changes: 105 additions & 42 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ def __init__(
:raises ValueError: If the given messages are invalid
"""
messages = self._check_and_convert_messages(messages)

# Take the Arbitration ID of the first element
self.arbitration_id = messages[0].arbitration_id
self.msgs_len = len(messages)
self.messages = messages
# Take the Arbitration ID of each message and put them into a list
self.arbitration_id = [self.messages[idx].arbitration_id for idx in range(self.msgs_len)]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer self.arbitration_ids

self.period = period
self.period_ns = int(round(period * 1e9))
self.messages = messages
self.msg_index = 0

@staticmethod
def _check_and_convert_messages(
Expand All @@ -81,8 +82,7 @@ def _check_and_convert_messages(
"""Helper function to convert a Message or Sequence of messages into a
tuple, and raises an error when the given value is invalid.

Performs error checking to ensure that all Messages have the same
arbitration ID and channel.
Performs error checking to ensure that all Messages have the same channel.

Should be called when the cyclic task is initialized.

Expand All @@ -97,12 +97,6 @@ def _check_and_convert_messages(
raise ValueError("Must be at least a list or tuple of length 1")
messages = tuple(messages)

all_same_id = all(
message.arbitration_id == messages[0].arbitration_id for message in messages
)
if not all_same_id:
raise ValueError("All Arbitration IDs should be the same")

all_same_channel = all(
message.channel == messages[0].channel for message in messages
)
Expand Down Expand Up @@ -154,16 +148,17 @@ def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None:

:raises ValueError: If the given messages are invalid
"""
if len(self.messages) != len(messages):
if self.msgs_len != len(messages):
raise ValueError(
"The number of new cyclic messages to be sent must be equal to "
"the number of messages originally specified for this task"
)
if self.arbitration_id != messages[0].arbitration_id:
raise ValueError(
"The arbitration ID of new cyclic messages cannot be changed "
"from when the task was created"
)
for idx in range(self.msgs_len):
if self.arbitration_id[idx] != messages[idx].arbitration_id:
raise ValueError(
"The arbitration ID of new cyclic messages cannot be changed "
"from when the task was created"
)
Comment on lines +156 to +161
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried but do you know if socketcan supports reordering the messages as well? In which case prefer to count the arbitration ids match.


def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
"""Update the contents of the periodically sent messages, without
Expand All @@ -185,6 +180,68 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:

self.messages = messages

class VariableRateCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
"""A Cyclic task that supports a group period and intra-message period."""
def _check_and_apply_period_intra(
self, period_intra: Optional[float]
) -> None:
"""
Helper function that checks if the given period_intra is valid and applies the
variable rate attributes to be used in the cyclic task.

:param period_intra:
The period in seconds to send intra-message.

:raises ValueError: If the given period_intra is invalid
"""
self._is_variable_rate = False
self._run_cnt_msgs = [0]
self._run_cnt_max = 1
self._run_cnt = 0

if period_intra is not None:
if not isinstance(period_intra, float):
raise ValueError("period_intra must be a float")
if period_intra <= 0:
raise ValueError("period_intra must be greater than 0")
if self.msgs_len <= 1:
raise ValueError("period_intra can only be used with multiple messages")
if period_intra*self.msgs_len >= self.period:
raise ValueError("period_intra per intra-message must be less than period")
period_ms = int(round(self.period * 1000, 0))
period_intra_ms = int(round(period_intra * 1000, 0))
(_run_period_ms, msg_cnts, group_cnts) = self._find_gcd(period_ms, period_intra_ms)
self._is_variable_rate = True
self._run_cnt_msgs = [i*msg_cnts for i in range(self.msgs_len)]
self._run_cnt_max = group_cnts
self._run_cnt = 0
# Override period, period_ms, and period_ns to be the variable period
self.period = _run_period_ms / 1000
self.period_ms = _run_period_ms
self.period_ns = _run_period_ms * 1000000

@staticmethod
def _find_gcd(
period_ms: int,
period_intra_ms: int,
) -> Tuple[int, int, int]:
"""
Helper function that finds the greatest common divisor between period_ms and period_intra_ms.

:returns:
Tuple of (gcd_ms, m_steps, n_steps)
* gcd_ms: greatest common divisor in milliseconds
* m_steps: number of steps to send intra-message
* n_steps: number of steps to send message group
"""
gcd_ms = min(period_ms, period_intra_ms)
while gcd_ms > 1:
if period_ms % gcd_ms == 0 and period_intra_ms % gcd_ms == 0:
break
gcd_ms -= 1
m_steps = int(period_intra_ms / gcd_ms)
n_steps = int(period_ms / gcd_ms)
return (gcd_ms, m_steps, n_steps)

class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
"""A Cyclic send task that supports switches send frequency after a set time."""
Expand Down Expand Up @@ -214,7 +271,7 @@ def __init__(


class ThreadBasedCyclicSendTask(
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, VariableRateCyclicTaskABC
):
"""Fallback cyclic send task using daemon thread."""

Expand All @@ -227,6 +284,7 @@ def __init__(
duration: Optional[float] = None,
on_error: Optional[Callable[[Exception], bool]] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
period_intra: Optional[float] = None,
) -> None:
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.

Expand All @@ -253,9 +311,11 @@ def __init__(
)
self.on_error = on_error
self.modifier_callback = modifier_callback
self._check_and_apply_period_intra(period_intra)

if USE_WINDOWS_EVENTS:
self.period_ms = int(round(period * 1000, 0))
if not self._is_variable_rate:
self.period_ms = int(round(period * 1000, 0))
try:
self.event = win32event.CreateWaitableTimerEx(
None,
Expand Down Expand Up @@ -289,41 +349,41 @@ def start(self) -> None:
self.thread.start()

def _run(self) -> None:
msg_index = 0
self.msg_index = 0
msg_due_time_ns = time.perf_counter_ns()

if USE_WINDOWS_EVENTS:
# Make sure the timer is non-signaled before entering the loop
win32event.WaitForSingleObject(self.event.handle, 0)

while not self.stopped:
msg_send = (self._run_cnt in self._run_cnt_msgs) if self._is_variable_rate else True
if self.end_time is not None and time.perf_counter() >= self.end_time:
break

# Prevent calling bus.send from multiple threads
with self.send_lock:
try:
if self.modifier_callback is not None:
self.modifier_callback(self.messages[msg_index])
self.bus.send(self.messages[msg_index])
except Exception as exc: # pylint: disable=broad-except
log.exception(exc)

# stop if `on_error` callback was not given
if self.on_error is None:
self.stop()
raise exc

# stop if `on_error` returns False
if not self.on_error(exc):
self.stop()
break
if msg_send:
# Prevent calling bus.send from multiple threads
with self.send_lock:
try:
if self.modifier_callback is not None:
self.modifier_callback(self.messages[self.msg_index])
self.bus.send(self.messages[self.msg_index])
except Exception as exc: # pylint: disable=broad-except
log.exception(exc)

# stop if `on_error` callback was not given
if self.on_error is None:
self.stop()
raise exc

# stop if `on_error` returns False
if not self.on_error(exc):
self.stop()
break
self.msg_index = (self.msg_index + 1) % self.msgs_len

if not USE_WINDOWS_EVENTS:
msg_due_time_ns += self.period_ns

msg_index = (msg_index + 1) % len(self.messages)

if USE_WINDOWS_EVENTS:
win32event.WaitForSingleObject(
self.event.handle,
Expand All @@ -334,3 +394,6 @@ def _run(self) -> None:
delay_ns = msg_due_time_ns - time.perf_counter_ns()
if delay_ns > 0:
time.sleep(delay_ns / NANOSECONDS_IN_SECOND)

if self._is_variable_rate:
self._run_cnt = (self._run_cnt + 1) % self._run_cnt_max
17 changes: 16 additions & 1 deletion can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def send_periodic(
duration: Optional[float] = None,
store_task: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None,
period_intra: Optional[float] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

Expand All @@ -235,6 +236,10 @@ def send_periodic(
Function which should be used to modify each message's data before
sending. The callback modifies the :attr:`~can.Message.data` of the
message and returns ``None``.
:param period_intra:
Period in seconds between each message when sending multiple messages
in a sequence. If not provided, the period will be used for each
message.
Comment on lines +239 to +242
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Period in seconds between individual message within the sequence.

I assume no impact for sending individual messages? Could you also update the docstring for period 🙏🏼

:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the task's
Expand Down Expand Up @@ -266,7 +271,7 @@ def send_periodic(
# Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
task = cast(
_SelfRemovingCyclicTask,
self._send_periodic_internal(msgs, period, duration, modifier_callback),
self._send_periodic_internal(msgs, period, duration, modifier_callback, period_intra),
)
# we wrap the task's stop method to also remove it from the Bus's list of tasks
periodic_tasks = self._periodic_tasks
Expand Down Expand Up @@ -294,6 +299,7 @@ def _send_periodic_internal(
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
period_intra: Optional[float] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Expand All @@ -306,6 +312,14 @@ def _send_periodic_internal(
:param duration:
The duration between sending each message at the given rate. If
no duration is provided, the task will continue indefinitely.
:param modifier_callback:
Function which should be used to modify each message's data before
sending. The callback modifies the :attr:`~can.Message.data` of the
message and returns ``None``.
:param period_intra:
Period in seconds between each message when sending multiple messages
in a sequence. If not provided, the period will be used for each
message.
:return:
A started task instance. Note the task can be stopped (and
depending on the backend modified) by calling the
Expand All @@ -323,6 +337,7 @@ def _send_periodic_internal(
period=period,
duration=duration,
modifier_callback=modifier_callback,
period_intra=period_intra,
)
return task

Expand Down
8 changes: 5 additions & 3 deletions can/interfaces/socketcan/socketcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
LimitedDurationCyclicSendTaskABC,
ModifiableCyclicTaskABC,
RestartableCyclicTaskABC,
VariableRateCyclicTaskABC,
)
from can.interfaces.socketcan import constants
from can.interfaces.socketcan.utils import find_available_interfaces, pack_filters
Expand Down Expand Up @@ -303,7 +304,7 @@ def _compose_arbitration_id(message: Message) -> int:


class CyclicSendTask(
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC, VariableRateCyclicTaskABC
):
"""
A SocketCAN cyclic send task supports:
Expand All @@ -320,6 +321,7 @@ def __init__(
messages: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
period_intra: Optional[float] = None,
) -> None:
"""Construct and :meth:`~start` a task.

Expand All @@ -339,7 +341,6 @@ def __init__(
# - self.period
# - self.duration
super().__init__(messages, period, duration)

self.bcm_socket = bcm_socket
self.task_id = task_id
self._tx_setup(self.messages)
Expand Down Expand Up @@ -809,6 +810,7 @@ def _send_periodic_internal(
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
period_intra: Optional[float] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

Expand Down Expand Up @@ -849,7 +851,7 @@ def _send_periodic_internal(
msgs_channel = str(msgs[0].channel) if msgs[0].channel else None
bcm_socket = self._get_bcm_socket(msgs_channel or self.channel)
task_id = self._get_next_task_id()
task = CyclicSendTask(bcm_socket, task_id, msgs, period, duration)
task = CyclicSendTask(bcm_socket, task_id, msgs, period, duration, period_intra)
return task

# fallback to thread based cyclic task
Expand Down