Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add device_id parameter to PcanBus constructor #1346

Merged
merged 3 commits into from
Jul 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion can/interfaces/pcan/pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class PcanBus(BusABC):
def __init__(
self,
channel="PCAN_USBBUS1",
device_id=None,
state=BusState.ACTIVE,
bitrate=500000,
*args,
Expand All @@ -119,6 +120,14 @@ def __init__(
Alternatively the value can be an int with the numerical value.
Default is 'PCAN_USBBUS1'

:param int device_id:
Select the PCAN interface based on its ID. The device ID is a 8/32bit
value that can be configured for each PCAN device. If you set the
device_id parameter, it takes precedence over the channel parameter.
The constructor searches all connected interfaces and initializes the
first one that matches the parameter value. If no device is found,
an exception is raised.

:param can.bus.BusState state:
BusState of the channel.
Default is ACTIVE
Expand Down Expand Up @@ -198,6 +207,15 @@ def __init__(
Ignored if not using CAN-FD.

"""
self.m_objPCANBasic = PCANBasic()

if device_id is not None:
channel = self._find_channel_by_dev_id(device_id)

if channel is None:
err_msg = "Cannot find a channel with ID {:08x}".format(device_id)
raise ValueError(err_msg)

self.channel_info = str(channel)
self.fd = kwargs.get("fd", False)
pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K)
Expand All @@ -209,7 +227,6 @@ def __init__(
if not isinstance(channel, int):
channel = PCAN_CHANNEL_NAMES[channel]

self.m_objPCANBasic = PCANBasic()
self.m_PcanHandle = channel

self.check_api_version()
Expand Down Expand Up @@ -269,6 +286,28 @@ def __init__(

super().__init__(channel=channel, state=state, bitrate=bitrate, *args, **kwargs)

def _find_channel_by_dev_id(self, device_id):
"""
Iterate over all possible channels to find a channel that matches the device
ID. This method is somewhat brute force, but the Basic API only offers a
suitable API call since V4.4.0.

:param device_id: The device_id for which to search for
:return: The name of a PCAN channel that matches the device ID, or None if
no channel can be found.
"""
for ch_name, ch_handle in PCAN_CHANNEL_NAMES.items():
err, cur_dev_id = self.m_objPCANBasic.GetValue(
ch_handle, PCAN_DEVICE_NUMBER
)
if err != PCAN_ERROR_OK:
continue

if cur_dev_id == device_id:
return ch_name

return None

def _get_formatted_error(self, error):
"""
Gets the text using the GetErrorText API function.
Expand Down
19 changes: 19 additions & 0 deletions test/test_pcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,25 @@ def test_status_string(self, name, status, expected_result) -> None:
self.assertEqual(self.bus.status_string(), expected_result)
self.mock_pcan.GetStatus.assert_called()

@parameterized.expand([(0x0, "error"), (0x42, "PCAN_USBBUS8")])
def test_constructor_with_device_id(self, dev_id, expected_result):
def get_value_side_effect(handle, param):
if param == PCAN_API_VERSION:
return PCAN_ERROR_OK, self.PCAN_API_VERSION_SIM.encode("ascii")

if handle in (PCAN_USBBUS8, PCAN_USBBUS14):
return 0, 0x42
else:
return PCAN_ERROR_ILLHW, 0x0

self.mock_pcan.GetValue = Mock(side_effect=get_value_side_effect)

if expected_result == "error":
self.assertRaises(ValueError, can.Bus, bustype="pcan", device_id=dev_id)
else:
self.bus = can.Bus(bustype="pcan", device_id=dev_id)
self.assertEqual(expected_result, self.bus.channel_info)


if __name__ == "__main__":
unittest.main()