Skip to content

Commit

Permalink
Add BitTiming/BitTimingFd support to VectorBus (#1470)
Browse files Browse the repository at this point in the history
* add BitTiming parameter to VectorBus

* use correct interface_version

* implement tests for bittiming classes with vector
  • Loading branch information
zariiii9003 committed Jan 27, 2023
1 parent d2abd34 commit 7a4c6f8
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 5 deletions.
51 changes: 46 additions & 5 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,20 @@

# Import Modules
# ==============
from can import BusABC, Message, CanInterfaceNotImplementedError, CanInitializationError
from can import (
BusABC,
Message,
CanInterfaceNotImplementedError,
CanInitializationError,
BitTiming,
BitTimingFd,
)
from can.util import (
len2dlc,
dlc2len,
deprecated_args_alias,
time_perfcounter_correlation,
check_or_adjust_timing_clock,
)
from can.typechecking import AutoDetectedConfig, CanFilters

Expand Down Expand Up @@ -86,6 +94,7 @@ def __init__(
can_filters: Optional[CanFilters] = None,
poll_interval: float = 0.01,
receive_own_messages: bool = False,
timing: Optional[Union[BitTiming, BitTimingFd]] = None,
bitrate: Optional[int] = None,
rx_queue_size: int = 2**14,
app_name: Optional[str] = "CANalyzer",
Expand All @@ -108,6 +117,15 @@ def __init__(
See :class:`can.BusABC`.
:param receive_own_messages:
See :class:`can.BusABC`.
:param timing:
An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
to specify the bit timing parameters for the VectorBus interface. The
`f_clock` value of the timing instance must be set to 16.000.000 (16MHz)
for standard CAN or 80.000.000 (80MHz) for CAN FD. If this parameter is provided,
it takes precedence over all other timing-related parameters.
Otherwise, the bit timing can be specified using the following parameters:
`bitrate` for standard CAN or `fd`, `data_bitrate`, `sjw_abr`, `tseg1_abr`,
`tseg2_abr`, `sjw_dbr`, `tseg1_dbr`, and `tseg2_dbr` for CAN FD.
:param poll_interval:
Poll interval in seconds.
:param bitrate:
Expand Down Expand Up @@ -184,7 +202,7 @@ def __init__(
channel_configs = get_channel_configs()

self.mask = 0
self.fd = fd
self.fd = isinstance(timing, BitTimingFd) if timing else fd
self.channel_masks: Dict[int, int] = {}
self.index_to_channel: Dict[int, int] = {}

Expand All @@ -204,12 +222,12 @@ def __init__(

permission_mask = xlclass.XLaccess()
# Set mask to request channel init permission if needed
if bitrate or fd:
if bitrate or fd or timing:
permission_mask.value = self.mask

interface_version = (
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
if fd
if self.fd
else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION
)

Expand All @@ -233,7 +251,30 @@ def __init__(

# set CAN settings
for channel in self.channels:
if fd:
if isinstance(timing, BitTiming):
timing = check_or_adjust_timing_clock(timing, [16_000_000])
self._set_bitrate_can(
channel=channel,
bitrate=timing.bitrate,
sjw=timing.sjw,
tseg1=timing.tseg1,
tseg2=timing.tseg2,
sam=timing.nof_samples,
)
elif isinstance(timing, BitTimingFd):
timing = check_or_adjust_timing_clock(timing, [80_000_000])
self._set_bitrate_canfd(
channel=channel,
bitrate=timing.nom_bitrate,
data_bitrate=timing.data_bitrate,
sjw_abr=timing.nom_sjw,
tseg1_abr=timing.nom_tseg1,
tseg2_abr=timing.nom_tseg2,
sjw_dbr=timing.data_sjw,
tseg1_dbr=timing.data_tseg1,
tseg2_dbr=timing.data_tseg2,
)
elif fd:
self._set_bitrate_canfd(
channel=channel,
bitrate=bitrate,
Expand Down
148 changes: 148 additions & 0 deletions test/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,154 @@ def test_bus_creation_fd_bitrate_timings() -> None:
bus.shutdown()


def test_bus_creation_timing_mocked(mock_xldriver) -> None:
timing = can.BitTiming.from_bitrate_and_segments(
f_clock=16_000_000,
bitrate=125_000,
tseg1=13,
tseg2=2,
sjw=1,
)
bus = can.Bus(channel=0, interface="vector", timing=timing, _testing=True)
assert isinstance(bus, canlib.VectorBus)
can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called()
can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called()

can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called()
xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0]
assert xlOpenPort_args[5] == xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION.value
assert xlOpenPort_args[6] == xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value

can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_not_called()
can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams.assert_called()
chip_params = (
can.interfaces.vector.canlib.xldriver.xlCanSetChannelParams.call_args[0]
)[2]
assert chip_params.bitRate == 125_000
assert chip_params.sjw == 1
assert chip_params.tseg1 == 13
assert chip_params.tseg2 == 2
assert chip_params.sam == 1


@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")
def test_bus_creation_timing() -> None:
timing = can.BitTiming.from_bitrate_and_segments(
f_clock=16_000_000,
bitrate=125_000,
tseg1=13,
tseg2=2,
sjw=1,
)
bus = can.Bus(
channel=0,
serial=_find_virtual_can_serial(),
interface="vector",
timing=timing,
)
assert isinstance(bus, canlib.VectorBus)

xl_channel_config = _find_xl_channel_config(
serial=_find_virtual_can_serial(), channel=0
)
assert xl_channel_config.busParams.data.can.bitRate == 125_000
assert xl_channel_config.busParams.data.can.sjw == 1
assert xl_channel_config.busParams.data.can.tseg1 == 13
assert xl_channel_config.busParams.data.can.tseg2 == 2

bus.shutdown()


def test_bus_creation_timingfd_mocked(mock_xldriver) -> None:
timing = can.BitTimingFd.from_bitrate_and_segments(
f_clock=80_000_000,
nom_bitrate=500_000,
nom_tseg1=68,
nom_tseg2=11,
nom_sjw=10,
data_bitrate=2_000_000,
data_tseg1=10,
data_tseg2=9,
data_sjw=8,
)
bus = can.Bus(
channel=0,
interface="vector",
timing=timing,
_testing=True,
)
assert isinstance(bus, canlib.VectorBus)
can.interfaces.vector.canlib.xldriver.xlOpenDriver.assert_called()
can.interfaces.vector.canlib.xldriver.xlGetApplConfig.assert_called()

can.interfaces.vector.canlib.xldriver.xlOpenPort.assert_called()
xlOpenPort_args = can.interfaces.vector.canlib.xldriver.xlOpenPort.call_args[0]
assert (
xlOpenPort_args[5] == xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4.value
)

assert xlOpenPort_args[6] == xldefine.XL_BusTypes.XL_BUS_TYPE_CAN.value

can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.assert_called()
can.interfaces.vector.canlib.xldriver.xlCanSetChannelBitrate.assert_not_called()

xlCanFdSetConfiguration_args = (
can.interfaces.vector.canlib.xldriver.xlCanFdSetConfiguration.call_args[0]
)
canFdConf = xlCanFdSetConfiguration_args[2]
assert canFdConf.arbitrationBitRate == 500_000
assert canFdConf.dataBitRate == 2_000_000
assert canFdConf.sjwAbr == 10
assert canFdConf.tseg1Abr == 68
assert canFdConf.tseg2Abr == 11
assert canFdConf.sjwDbr == 8
assert canFdConf.tseg1Dbr == 10
assert canFdConf.tseg2Dbr == 9


@pytest.mark.skipif(not XLDRIVER_FOUND, reason="Vector XL API is unavailable")
def test_bus_creation_timingfd() -> None:
timing = can.BitTimingFd.from_bitrate_and_segments(
f_clock=80_000_000,
nom_bitrate=500_000,
nom_tseg1=68,
nom_tseg2=11,
nom_sjw=10,
data_bitrate=2_000_000,
data_tseg1=10,
data_tseg2=9,
data_sjw=8,
)
bus = can.Bus(
channel=0,
serial=_find_virtual_can_serial(),
interface="vector",
timing=timing,
)

xl_channel_config = _find_xl_channel_config(
serial=_find_virtual_can_serial(), channel=0
)
assert (
xl_channel_config.interfaceVersion
== xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
)
assert (
xl_channel_config.busParams.data.canFD.canOpMode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD
)
assert xl_channel_config.busParams.data.canFD.arbitrationBitRate == 500_000
assert xl_channel_config.busParams.data.canFD.sjwAbr == 10
assert xl_channel_config.busParams.data.canFD.tseg1Abr == 68
assert xl_channel_config.busParams.data.canFD.tseg2Abr == 11
assert xl_channel_config.busParams.data.canFD.sjwDbr == 8
assert xl_channel_config.busParams.data.canFD.tseg1Dbr == 10
assert xl_channel_config.busParams.data.canFD.tseg2Dbr == 9
assert xl_channel_config.busParams.data.canFD.dataBitRate == 2_000_000

bus.shutdown()


def test_send_mocked(mock_xldriver) -> None:
bus = can.Bus(channel=0, interface="vector", _testing=True)
msg = can.Message(
Expand Down

0 comments on commit 7a4c6f8

Please sign in to comment.