Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class CyclicTask(object):
@abc.abstractmethod
def stop(self):
"""Cancel this periodic task.

:raises can.CanError:
If stop is called on an already stopped task.
"""


Expand Down
30 changes: 26 additions & 4 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,7 @@ def send_periodic(self, msg, period, duration=None, store_task=True):
api with ``store_task==True`` may not be appropriate as the stopped tasks are
still taking up memory as they are associated with the Bus instance.
"""
if not hasattr(self, "_lock_send_periodic"):
# Create a send lock for this bus
self._lock_send_periodic = threading.Lock()
task = ThreadBasedCyclicSendTask(self, self._lock_send_periodic, msg, period, duration)
task = self._send_periodic_internal(msg, period, duration)
# we wrap the task's stop method to also remove it from the Bus's list of tasks
original_stop_method = task.stop

Expand All @@ -213,8 +210,33 @@ def wrapped_stop_method(remove_task=True):
pass
original_stop_method()
task.stop = wrapped_stop_method

if store_task:
self._periodic_tasks.append(task)

return task

def _send_periodic_internal(self, msg, period, duration=None):
"""Default implementation of periodic message sending using threading.

Override this method to enable a more efficient backend specific approach.

:param can.Message msg:
Message to transmit
:param float period:
Period in seconds between each message
:param float duration:
The duration to keep sending this message at given rate. If
no duration is provided, the task will continue indefinitely.
:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the :meth:`stop` method.
:rtype: can.broadcastmanager.CyclicSendTaskABC
"""
if not hasattr(self, "_lock_send_periodic"):
# Create a send lock for this bus
self._lock_send_periodic = threading.Lock()
task = ThreadBasedCyclicSendTask(self, self._lock_send_periodic, msg, period, duration)
return task

def stop_all_periodic_tasks(self, remove_tasks=True):
Expand Down
2 changes: 1 addition & 1 deletion can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def send(self, msg, timeout=None):
else:
_canlib.canChannelPostMessage(self._channel_handle, message)

def send_periodic(self, msg, period, duration=None):
def _send_periodic_internal(self, msg, period, duration=None):
"""Send a message using built-in cyclic transmit list functionality."""
if self._scheduler is None:
self._scheduler = HANDLE()
Expand Down
3 changes: 1 addition & 2 deletions can/interfaces/socketcan/socketcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ def _send_once(self, data, channel=None):
raise can.CanError("Failed to transmit: %s" % exc)
return sent

def send_periodic(self, msg, period, duration=None):
def _send_periodic_internal(self, msg, period, duration=None):
"""Start sending a message at a given period on this bus.

The kernel's broadcast manager will be used.
Expand Down Expand Up @@ -598,7 +598,6 @@ def send_periodic(self, msg, period, duration=None):
"""
bcm_socket = self._get_bcm_socket(msg.channel or self.channel)
task = CyclicSendTask(bcm_socket, msg, period, duration)
self._periodic_tasks.append(task)
return task

def _get_bcm_socket(self, channel):
Expand Down
22 changes: 11 additions & 11 deletions doc/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ Creating a new interface/backend

These steps are a guideline on how to add a new backend to python-can.

- Create a module (either a ``*.py`` or an entire subdirctory depending
- Create a module (either a ``*.py`` or an entire subdirectory depending
on the complexity) inside ``can.interfaces``
- Implement the central part of the backend: the bus class that extends
:class:`can.BusABC`. See below for more info on this one!
- Register your backend bus class in ``can.interface.BACKENDS`` and
``can.interfaces.VALID_INTERFACES``.
- Add docs where appropiate, like in ``doc/interfaces.rst`` and add
an entry in ``doc/interface/*``.
Update ``doc/scripts.rst`` accordingly.
- Add tests in ``test/*`` where appropiate.
``can.interfaces.VALID_INTERFACES`` in ``can.interfaces.__init__.py``.
- Add docs where appropriate. At a minimum add to ``doc/interfaces.rst`` and add
a new interface specific document in ``doc/interface/*``.
- Update ``doc/scripts.rst`` accordingly.
- Add tests in ``test/*`` where appropriate.


About the ``BusABC`` class
Expand All @@ -59,15 +59,15 @@ They *might* implement the following:
messages yet to be sent
* :meth:`~can.BusABC.shutdown` to override how the bus should
shut down
* :meth:`~can.BusABC.send_periodic` to override the software based
periodic sending and push it down to the kernel or hardware
* :meth:`~can.BusABC._send_periodic_internal` to override the software based
periodic sending and push it down to the kernel or hardware.
* :meth:`~can.BusABC._apply_filters` to apply efficient filters
to lower level systems like the OS kernel or hardware
to lower level systems like the OS kernel or hardware.
* :meth:`~can.BusABC._detect_available_configs` to allow the interface
to report which configurations are currently available for new
connections
connections.
* :meth:`~can.BusABC.state` property to allow reading and/or changing
the bus state
the bus state.

.. note::

Expand Down
12 changes: 8 additions & 4 deletions examples/cyclic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@ def simple_periodic_send(bus):
def limited_periodic_send(bus):
print("Starting to send a message every 200ms for 1s")
msg = can.Message(arbitration_id=0x12345678, data=[0, 0, 0, 0, 0, 0], extended_id=True)
task = bus.send_periodic(msg, 0.20, 1)
task = bus.send_periodic(msg, 0.20, 1, store_task=False)
if not isinstance(task, can.LimitedDurationCyclicSendTaskABC):
print("This interface doesn't seem to support a ")
task.stop()
return

time.sleep(1.5)
print("stopped cyclic send")
time.sleep(2)
print("Cyclic send should have stopped as duration expired")
# Note the (finished) task will still be tracked by the Bus
# unless we pass `store_task=False` to bus.send_periodic
# alternatively calling stop removes the task from the bus
#task.stop()


def test_periodic_send_with_modifying_data(bus):
Expand Down Expand Up @@ -106,7 +110,7 @@ def test_periodic_send_with_modifying_data(bus):
reset_msg = can.Message(arbitration_id=0x00, data=[0, 0, 0, 0, 0, 0], extended_id=False)

for interface, channel in [
('socketcan', 'can0'),
('socketcan', 'vcan0'),
#('ixxat', 0)
]:
print("Carrying out cyclic tests with {} interface".format(interface))
Expand Down