Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapter enumeration #1180

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
def fileno(self) -> int:
raise NotImplementedError("fileno is not implemented using current CAN bus")

@classmethod
def list_adapters(cls) -> List[Any]:
"""Lists all adapters for this interface. The adapter identifier can be used to open a specific adapter.

MAY NOT BE IMPLEMENTED BY ALL INTERFACES
"""
raise NotImplementedError()


class _SelfRemovingCyclicTask(CyclicSendTaskABC, ABC):
"""Removes itself from a bus.
Expand Down
2 changes: 1 addition & 1 deletion can/ctypesutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def map_symbol(
if sys.platform == "win32":
HRESULT = ctypes.HRESULT

elif sys.platform == "cygwin":
else:

class HRESULT(ctypes.c_long):
pass
Expand Down
5 changes: 3 additions & 2 deletions can/interfaces/ixxat/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""
Ctypes wrapper module for IXXAT Virtual CAN Interface V3 on win32 systems
Ctypes wrapper module for IXXAT Virtual CAN Interface on win32 systems

Copyright (C) 2016 Giuseppe Corbelli <giuseppe.corbelli@weightpack.com>
Copyright (c) 2021 Marcel Kanter <marcel.kanter@googlemail.com>
"""

from can.interfaces.ixxat.canlib import IXXATBus, get_ixxat_hwids
from can.interfaces.ixxat.canlib import IXXATBus
84 changes: 47 additions & 37 deletions can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
LimitedDurationCyclicSendTaskABC,
RestartableCyclicTaskABC,
)
from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT as ctypes_HRESULT
from can.ctypesutil import CLibrary, HANDLE, PHANDLE, HRESULT

from . import constants, structures
from .exceptions import *
Expand Down Expand Up @@ -136,7 +136,7 @@ def __check_status(result, function, arguments):

# void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize);
_canlib.map_symbol(
"vciFormatError", None, (ctypes_HRESULT, ctypes.c_char_p, ctypes.c_uint32)
"vciFormatError", None, (HRESULT, ctypes.c_char_p, ctypes.c_uint32)
)
# Hack to have vciFormatError as a free function
vciFormatError = functools.partial(__vciFormatError, _canlib)
Expand Down Expand Up @@ -408,22 +408,28 @@ class IXXATBus(BusABC):
},
}

def __init__(self, channel, can_filters=None, **kwargs):
def __init__(self, channel=0, can_filters=None, **kwargs):
"""
:param int channel:
The Channel id to create this bus with.

:param list can_filters:
See :meth:`can.BusABC.set_filters`.

:param bool receive_own_messages:
Enable self-reception of sent messages.

:param int UniqueHardwareId:
UniqueHardwareId to connect (optional, will use the first found if not supplied)

:param int bitrate:
Channel bitrate in bit/s

:param String adapter:
adapter to connect (optional, will use the first found if not supplied)

:param int rxFifoSize:
Set the receive FIFO size to this value.

:param int txFifoSize:
Set the transmit FIFO size to this value.

:param bool receive_own_messages:
Enable self-reception of sent messages.
"""
if _canlib is None:
raise CanInterfaceNotImplementedError(
Expand All @@ -433,7 +439,7 @@ def __init__(self, channel, can_filters=None, **kwargs):
log.info("Got configuration of: %s", kwargs)
# Configuration options
bitrate = kwargs.get("bitrate", 500000)
UniqueHardwareId = kwargs.get("UniqueHardwareId", None)
adapter = kwargs.get("adapter", None)
rxFifoSize = kwargs.get("rxFifoSize", 16)
txFifoSize = kwargs.get("txFifoSize", 16)
self._receive_own_messages = kwargs.get("receive_own_messages", False)
Expand Down Expand Up @@ -461,31 +467,30 @@ def __init__(self, channel, can_filters=None, **kwargs):
self._payload = (ctypes.c_byte * 8)()

# Search for supplied device
if UniqueHardwareId is None:
log.info("Searching for first available device")
if adapter is None:
log.info("Searching for first available adapter")
else:
log.info("Searching for unique HW ID %s", UniqueHardwareId)
log.info("Searching for adapter %s", adapter)
_canlib.vciEnumDeviceOpen(ctypes.byref(self._device_handle))
while True:
try:
_canlib.vciEnumDeviceNext(
self._device_handle, ctypes.byref(self._device_info)
)
except StopIteration:
if UniqueHardwareId is None:
if adapter is None:
raise VCIDeviceNotFoundError(
"No IXXAT device(s) connected or device(s) in use by other process(es)."
)
else:
raise VCIDeviceNotFoundError(
"Unique HW ID {} not connected or not available.".format(
UniqueHardwareId
adapter
)
)
else:
if (UniqueHardwareId is None) or (
self._device_info.UniqueHardwareId.AsChar
== bytes(UniqueHardwareId, "ascii")
if (adapter is None) or (
self._device_info.UniqueHardwareId.AsChar == bytes(adapter, "ascii")
):
break
else:
Expand Down Expand Up @@ -762,6 +767,30 @@ def shutdown(self):
_canlib.canControlClose(self._control_handle)
_canlib.vciDeviceClose(self._device_handle)

@classmethod
def list_adapters(cls):
"""Get a list of hardware ids of all available IXXAT adapters."""
adapters = []
device_handle = HANDLE()
device_info = structures.VCIDEVICEINFO()

if _canlib is None:
raise CanInterfaceNotImplementedError(
"The IXXAT VCI library has not been initialized. Check the logs for more details."
)

_canlib.vciEnumDeviceOpen(ctypes.byref(device_handle))
while True:
try:
_canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info))
except StopIteration:
break
else:
adapters.append(device_info.UniqueHardwareId.AsChar.decode("ascii"))
_canlib.vciEnumDeviceClose(device_handle)

return adapters


class CyclicSendTask(LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC):
"""A message in the cyclic transmit list."""
Expand Down Expand Up @@ -827,22 +856,3 @@ def _format_can_status(status_flags: int):
return "CAN status message: {}".format(", ".join(states))
else:
return "Empty CAN status message"


def get_ixxat_hwids():
"""Get a list of hardware ids of all available IXXAT devices."""
hwids = []
device_handle = HANDLE()
device_info = structures.VCIDEVICEINFO()

_canlib.vciEnumDeviceOpen(ctypes.byref(device_handle))
while True:
try:
_canlib.vciEnumDeviceNext(device_handle, ctypes.byref(device_info))
except StopIteration:
break
else:
hwids.append(device_info.UniqueHardwareId.AsChar.decode("ascii"))
_canlib.vciEnumDeviceClose(device_handle)

return hwids
25 changes: 13 additions & 12 deletions doc/interfaces/ixxat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
IXXAT Virtual CAN Interface
===========================

Interface to `IXXAT <http://www.ixxat.com/>`__ Virtual CAN Interface V3 SDK. Works on Windows.
Interface to `IXXAT <http://www.ixxat.com/>`__ Virtual CAN Interface SDK. Works on Windows.

The Linux ECI SDK is currently unsupported, however on Linux some devices are
supported with :doc:`socketcan`.
Expand All @@ -26,21 +26,21 @@ Bus

Configuration file
------------------

The simplest configuration file would be::

[default]
interface = ixxat
channel = 0

Python-can will search for the first IXXAT device available and open the first channel.
Python-can will search for the first IXXAT adapter available and open the first channel.
``interface`` and ``channel`` parameters are interpreted by frontend ``can.interfaces.interface``
module, while the following parameters are optional and are interpreted by IXXAT implementation.

* ``bitrate`` (default 500000) Channel bitrate
* ``UniqueHardwareId`` (default first device) Unique hardware ID of the IXXAT device
* ``adapter`` (default first adapter) Unique hardware ID of the IXXAT device
* ``rxFifoSize`` (default 16) Number of RX mailboxes
* ``txFifoSize`` (default 16) Number of TX mailboxes
* ``extended`` (default False) Allow usage of extended IDs


Filtering
Expand All @@ -55,14 +55,15 @@ VCI documentation, section "Message filters" for more info.

List available devices
----------------------
In case you have connected multiple IXXAT devices, you have to select them by using their unique hardware id.
To get a list of all connected IXXAT you can use the function ``get_ixxat_hwids()`` as demonstrated below:

>>> from can.interfaces.ixxat import get_ixxat_hwids
>>> for hwid in get_ixxat_hwids():
... print("Found IXXAT with hardware id '%s'." % hwid)
Found IXXAT with hardware id 'HW441489'.
Found IXXAT with hardware id 'HW107422'.

In case you have connected multiple IXXAT adapters, you have to select them by using their unique hardware id.
To get a list of all connected IXXAT adapters you can use the function ``list_adapters()`` as demonstrated below:

>>> from can.interfaces.ixxat import IXXATBus
>>> for hwid in IXXATBus.list_adapters():
... print("Found IXXAT adapter with hardware id '%s'." % hwid)
Found IXXAT adapter with hardware id 'HW441489'.
Found IXXAT adapter with hardware id 'HW107422'.


Internals
Expand Down
64 changes: 48 additions & 16 deletions test/test_interface_ixxat.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""
Unittest for ixxat interface.

Run only this test:
python setup.py test --addopts "--verbose -s test/test_interface_ixxat.py"
Copyright (c) 2020, 2021 Marcel Kanter <marcel.kanter@googlemail.com>
"""

import unittest
import can
import sys

from can.interfaces.ixxat import IXXATBus
from can.exceptions import CanInterfaceNotImplementedError


class SoftwareTestCase(unittest.TestCase):
Expand All @@ -15,10 +18,7 @@ class SoftwareTestCase(unittest.TestCase):
"""

def setUp(self):
try:
bus = can.Bus(interface="ixxat", channel=0)
bus.shutdown()
except can.CanInterfaceNotImplementedError:
if sys.platform != "win32" and sys.platform != "cygwin":
raise unittest.SkipTest("not available on this platform")

def test_bus_creation(self):
Expand All @@ -34,28 +34,60 @@ def test_bus_creation(self):
with self.assertRaises(ValueError):
can.Bus(interface="ixxat", channel=0, txFifoSize=0)

# non-existent channel (use arbitrary high value)
with self.assertRaises(can.CanInitializationError):
can.Bus(interface="ixxat", channel=0xFFFF)

def test_adapter_enumeration(self):
# Enumeration of adapters should always work (if the driver is installed) and the result should support len and be iterable
try:
adapters = IXXATBus.list_adapters()
except CanInterfaceNotImplementedError:
raise unittest.SkipTest("Maybe the driver is not installed.")

n = 0
for adapter in adapters:
n += 1
self.assertEqual(len(adapters), n)


class HardwareTestCase(unittest.TestCase):
"""
Test cases that rely on an existing/connected hardware.
THEY NEED TO BE EXECUTED WITH AT LEAST ONE CONNECTED ADAPTER!
"""

def setUp(self):
try:
bus = can.Bus(interface="ixxat", channel=0)
bus.shutdown()
except can.CanInterfaceNotImplementedError:
if sys.platform != "win32" and sys.platform != "cygwin":
raise unittest.SkipTest("not available on this platform")

def test_bus_creation(self):
# non-existent channel -> use arbitrary high value
with self.assertRaises(can.CanInitializationError):
can.Bus(interface="ixxat", channel=0xFFFF)
# Test the enumeration of all adapters by opening and closing each adapter
try:
adapters = IXXATBus.list_adapters()
except CanInterfaceNotImplementedError:
raise unittest.SkipTest("Maybe the driver is not installed.")

for adapter in adapters:
bus = can.Bus(interface="ixxat", adapter=adapter)
bus.shutdown()

def test_send_after_shutdown(self):
with can.Bus(interface="ixxat", channel=0) as bus:
with self.assertRaises(can.CanOperationError):
bus.send(can.Message(arbitration_id=0x3FF, dlc=0))
# At least one adapter is needed, skip the test if none can be found
try:
adapters = IXXATBus.list_adapters()
except CanInterfaceNotImplementedError:
raise unittest.SkipTest("Maybe the driver is not installed.")

if len(adapters) == 0:
raise unittest.SkipTest("No adapters found")

bus = can.Bus(interface="ixxat", channel=0)
msg = can.Message(arbitration_id=0x3FF, dlc=0)
# Intentionally close the bus now and try to send afterwards. This should lead to an CanOperationError
bus.shutdown()
with self.assertRaises(can.CanOperationError):
bus.send(msg)


if __name__ == "__main__":
Expand Down