Skip to content

Commit

Permalink
Add support for multiple Cyclic Messages in Task
Browse files Browse the repository at this point in the history
This adds support for multiple Cyclic Messages in a Cyclic Task. The
default implementation is also changed to provide support for this, by
iterating over the list of Cyclic Messages. In order to maintain
backwards compatibility, the periodic APIs now take a CAN Message as
before, in addition to a Sequence of CAN Messages.

The SocketCAN interface takes advantage of the Linux BCM APIs to do so,
while the IXXAT interface maintains its original behaviour.

This also introduces a new example that illustrates how to use Cyclic
Messages, backed by the SocketCAN interface.

We previously tracked the can_id and arbitration_id class members due to
the ongoing deprecation of the can_id Message attribute. Now that can_id
is replaced by arbitration_id, we no longer need this in
CyclicSendTaskABC either. As such, this removes the deprecated can_id
member from the Cyclic Task.

Fixes #606
  • Loading branch information
karlding committed Jun 18, 2019
1 parent a26c209 commit b12ea74
Show file tree
Hide file tree
Showing 6 changed files with 877 additions and 100 deletions.
127 changes: 98 additions & 29 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import threading
import time

import can

log = logging.getLogger("can.bcm")


Expand All @@ -34,28 +36,65 @@ class CyclicSendTaskABC(CyclicTask):
Message send task with defined period
"""

def __init__(self, message, period):
def __init__(self, messages, period):
"""
:param can.Message message: The message to be sent periodically.
:param float period: The rate in seconds at which to send the message.
:param Union[Sequence[can.Message], can.Message] messages:
The messages to be sent periodically.
:param float period: The rate in seconds at which to send the messages.
"""
self.message = message
self.can_id = message.arbitration_id
self.arbitration_id = message.arbitration_id
messages = self._check_and_convert_messages(messages)

# Take the Arbitration ID of the first element
self.arbitration_id = messages[0].arbitration_id
self.period = period
super().__init__()
self.messages = messages

@staticmethod
def _check_and_convert_messages(messages):
"""Helper function to convert a Message or Sequence of messages into a
tuple, and raises an error when the given value is invalid.
Performs error checking to ensure that all Messages have the same
arbitration ID and channel.
Should be called when the cyclic task is initialized
"""
if not isinstance(messages, (list, tuple)):
if isinstance(messages, can.Message):
messages = [messages]
else:
raise ValueError("Must be either a list, tuple, or a Message")
if not messages:
raise ValueError("Must be at least a list or tuple of length 1")
messages = tuple(messages)

all_same_id = all(
message.arbitration_id == messages[0].arbitration_id for message in messages
)
if not all_same_id:
raise ValueError("All Arbitration IDs should be the same")

all_same_channel = all(
message.channel == messages[0].channel for message in messages
)
if not all_same_channel:
raise ValueError("All Channel IDs should be the same")

return messages


class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC):
def __init__(self, message, period, duration):
def __init__(self, messages, period, duration):
"""Message send task with a defined duration and period.
:param can.Message message: The message to be sent periodically.
:param float period: The rate in seconds at which to send the message.
:param Union[Sequence[can.Message], can.Message] messages:
The messages to be sent periodically.
:param float period: The rate in seconds at which to send the messages.
:param float duration:
The duration to keep sending this message at given rate.
Approximate duration in seconds to continue sending messages. If
no duration is provided, the task will continue indefinitely.
"""
super().__init__(message, period)
super().__init__(messages, period)
self.duration = duration


Expand All @@ -71,44 +110,72 @@ def start(self):
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
"""Adds support for modifying a periodic message"""

def modify_data(self, message):
"""Update the contents of this periodically sent message without altering
the timing.
def _check_modified_messages(self, messages):
"""Helper function to perform error checking when modifying the data in
the cyclic task.
:param can.Message message:
The message with the new :attr:`can.Message.data`.
Note: The arbitration ID cannot be changed.
Performs error checking to ensure the arbitration ID and the number of
cyclic messages hasn't changed.
Should be called when modify_data is called in the cyclic task.
"""
if len(self.messages) != len(messages):
raise ValueError(
"The number of new cyclic messages to be sent must be equal to "
"the number of messages originally specified for this task"
)
if self.arbitration_id != messages[0].arbitration_id:
raise ValueError(
"The arbitration ID of new cyclic messages cannot be changed "
"from when the task was created"
)
return messages

def modify_data(self, messages):
"""Update the contents of the periodically sent messages, without
altering the timing.
:param Union[Sequence[can.Message], can.Message] messages:
The messages with the new :attr:`can.Message.data`.
Note: The arbitration ID cannot be changed.
Note: The number of new cyclic messages to be sent must be equal
to the original number of messages originally specified for this
task.
"""
self.message = message
messages = self._check_and_convert_messages(messages)
messages = self._check_modified_messages(messages)
self.messages = messages


class MultiRateCyclicSendTaskABC(CyclicSendTaskABC):
"""A Cyclic send task that supports switches send frequency after a set time.
"""

def __init__(self, channel, message, count, initial_period, subsequent_period):
def __init__(self, channel, messages, count, initial_period, subsequent_period):
"""
Transmits a message `count` times at `initial_period` then continues to
transmit message at `subsequent_period`.
transmit messages at `subsequent_period`.
:param channel: See interface specific documentation.
:param can.Message message:
:param Union[Sequence[can.Message], can.Message] messages:
:param int count:
:param float initial_period:
:param float subsequent_period:
"""
super().__init__(channel, message, subsequent_period)
super().__init__(channel, messages, subsequent_period)


class ThreadBasedCyclicSendTask(
ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC
):
"""Fallback cyclic send task using thread."""

def __init__(self, bus, lock, message, period, duration=None):
super().__init__(message, period, duration)
def __init__(self, bus, lock, messages, period, duration=None):
super().__init__(messages, period, duration)
self.bus = bus
self.lock = lock
self.send_lock = lock
self.stopped = True
self.thread = None
self.end_time = time.time() + duration if duration else None
Expand All @@ -120,23 +187,25 @@ def stop(self):
def start(self):
self.stopped = False
if self.thread is None or not self.thread.is_alive():
name = "Cyclic send task for 0x%X" % (self.message.arbitration_id)
name = "Cyclic send task for 0x%X" % (self.messages[0].arbitration_id)
self.thread = threading.Thread(target=self._run, name=name)
self.thread.daemon = True
self.thread.start()

def _run(self):
msg_index = 0
while not self.stopped:
# Prevent calling bus.send from multiple threads
with self.lock:
with self.send_lock:
started = time.time()
try:
self.bus.send(self.message)
self.bus.send(self.messages[msg_index])
except Exception as exc:
log.exception(exc)
break
if self.end_time is not None and time.time() >= self.end_time:
break
msg_index = (msg_index + 1) % len(self.messages)
# Compensate for the time it takes to send the message
delay = self.period - (time.time() - started)
time.sleep(max(0.0, delay))
44 changes: 27 additions & 17 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

from abc import ABCMeta, abstractmethod
import can
import logging
import threading
from time import time
Expand Down Expand Up @@ -163,8 +164,8 @@ def send(self, msg, timeout=None):
"""
raise NotImplementedError("Trying to write to a readonly bus?")

def send_periodic(self, msg, period, duration=None, store_task=True):
"""Start sending a message at a given period on this bus.
def send_periodic(self, msgs, period, duration=None, store_task=True):
"""Start sending messages at a given period on this bus.
The task will be active until one of the following conditions are met:
Expand All @@ -174,12 +175,12 @@ def send_periodic(self, msg, period, duration=None, store_task=True):
- :meth:`BusABC.stop_all_periodic_tasks()` is called
- the task's :meth:`CyclicTask.stop()` method is called.
:param can.Message msg:
Message to transmit
:param Union[Sequence[can.Message], can.Message] msgs:
Messages to transmit
:param float period:
Period in seconds between each message
:param float duration:
The duration to keep sending this message at given rate. If
Approximate duration in seconds to continue sending messages. If
no duration is provided, the task will continue indefinitely.
:param bool store_task:
If True (the default) the task will be attached to this Bus instance.
Expand All @@ -191,18 +192,26 @@ def send_periodic(self, msg, period, duration=None, store_task=True):
.. note::
Note the duration before the message stops being sent may not
Note the duration before the messages stop being sent may not
be exactly the same as the duration specified by the user. In
general the message will be sent at the given rate until at
least **duration** seconds.
.. note::
For extremely long running Bus instances with many short lived tasks the default
api with ``store_task==True`` may not be appropriate as the stopped tasks are
still taking up memory as they are associated with the Bus instance.
For extremely long running Bus instances with many short lived
tasks the default api with ``store_task==True`` may not be
appropriate as the stopped tasks are still taking up memory as they
are associated with the Bus instance.
"""
task = self._send_periodic_internal(msg, period, duration)
if not isinstance(msgs, (list, tuple)):
if isinstance(msgs, can.Message):
msgs = [msgs]
else:
raise ValueError("Must be either a list, tuple, or a Message")
if not msgs:
raise ValueError("Must be at least a list or tuple of length 1")
task = self._send_periodic_internal(msgs, period, duration)
# we wrap the task's stop method to also remove it from the Bus's list of tasks
original_stop_method = task.stop

Expand All @@ -221,21 +230,22 @@ def wrapped_stop_method(remove_task=True):

return task

def _send_periodic_internal(self, msg, period, duration=None):
def _send_periodic_internal(self, msgs, period, duration=None):
"""Default implementation of periodic message sending using threading.
Override this method to enable a more efficient backend specific approach.
:param can.Message msg:
Message to transmit
:param Union[Sequence[can.Message], can.Message] msgs:
Messages to transmit
:param float period:
Period in seconds between each message
:param float duration:
The duration to keep sending this message at given rate. If
The duration between sending each message at the given rate. If
no duration is provided, the task will continue indefinitely.
:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the :meth:`stop` method.
A started task instance. Note the task can be stopped (and
depending on the backend modified) by calling the :meth:`stop`
method.
:rtype: can.broadcastmanager.CyclicSendTaskABC
"""
if not hasattr(self, "_lock_send_periodic"):
Expand All @@ -244,7 +254,7 @@ def _send_periodic_internal(self, msg, period, duration=None):
threading.Lock()
) # pylint: disable=attribute-defined-outside-init
task = ThreadBasedCyclicSendTask(
self, self._lock_send_periodic, msg, period, duration
self, self._lock_send_periodic, msgs, period, duration
)
return task

Expand Down
19 changes: 12 additions & 7 deletions can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,20 +716,25 @@ def shutdown(self):
class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC):
"""A message in the cyclic transmit list."""

def __init__(self, scheduler, msg, period, duration, resolution):
super().__init__(msg, period, duration)
def __init__(self, scheduler, msgs, period, duration, resolution):
super().__init__(msgs, period, duration)
if len(self.messages) != 1:
raise ValueError(
"IXXAT Interface only supports periodic transmission of 1 element"
)

self._scheduler = scheduler
self._index = None
self._count = int(duration / period) if duration else 0

self._msg = structures.CANCYCLICTXMSG()
self._msg.wCycleTime = int(round(period * resolution))
self._msg.dwMsgId = msg.arbitration_id
self._msg.dwMsgId = self.messages[0].arbitration_id
self._msg.uMsgInfo.Bits.type = constants.CAN_MSGTYPE_DATA
self._msg.uMsgInfo.Bits.ext = 1 if msg.is_extended_id else 0
self._msg.uMsgInfo.Bits.rtr = 1 if msg.is_remote_frame else 0
self._msg.uMsgInfo.Bits.dlc = msg.dlc
for i, b in enumerate(msg.data):
self._msg.uMsgInfo.Bits.ext = 1 if self.messages[0].is_extended_id else 0
self._msg.uMsgInfo.Bits.rtr = 1 if self.messages[0].is_remote_frame else 0
self._msg.uMsgInfo.Bits.dlc = self.messages[0].dlc
for i, b in enumerate(self.messages[0].data):
self._msg.abData[i] = b
self.start()

Expand Down
Loading

0 comments on commit b12ea74

Please sign in to comment.