Skip to content

Commit

Permalink
VectorBus init refactoring (#1389)
Browse files Browse the repository at this point in the history
* refactor VectorBus.__init__()

* move bitrate methods below __init__(), fix typo

* refactor channel index search into method '_find_global_channel_idx', improve error messages
  • Loading branch information
zariiii9003 committed Sep 11, 2022
1 parent 40f6cce commit 1e11f21
Showing 1 changed file with 200 additions and 116 deletions.
316 changes: 200 additions & 116 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
Callable,
cast,
)

WaitForSingleObject: Optional[Callable[[int, int], int]]
Expand All @@ -43,7 +44,7 @@
deprecated_args_alias,
time_perfcounter_correlation,
)
from can.typechecking import AutoDetectedConfig, CanFilters, Channel
from can.typechecking import AutoDetectedConfig, CanFilters

# Define Module Logger
# ====================
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(
if xldriver is None:
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
self.xldriver = xldriver # keep reference so mypy knows it is not None
self.xldriver.xlOpenDriver()

self.poll_interval = poll_interval

Expand All @@ -165,7 +167,7 @@ def __init__(
self.channels = [int(ch) for ch in channel]
else:
raise TypeError(
f"Invalid type for channels parameter: {type(channel).__name__}"
f"Invalid type for parameter 'channel': {type(channel).__name__}"
)

self._app_name = app_name.encode() if app_name is not None else b""
Expand All @@ -174,136 +176,71 @@ def __init__(
", ".join(f"CAN {ch + 1}" for ch in self.channels),
)

if serial is not None:
app_name = None
channel_index = []
channel_configs = get_channel_configs()
for channel_config in channel_configs:
if channel_config.serialNumber == serial:
if channel_config.hwChannel in self.channels:
channel_index.append(channel_config.channelIndex)
if channel_index:
if len(channel_index) != len(self.channels):
LOG.info(
"At least one defined channel wasn't found on the specified hardware."
)
self.channels = channel_index
else:
# Is there any better way to raise the error?
raise CanInitializationError(
"None of the configured channels could be found on the specified hardware."
)
channel_configs = get_channel_configs()

self.xldriver.xlOpenDriver()
self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
self.mask = 0
self.fd = fd
# Get channels masks
self.channel_masks: Dict[Optional[Channel], int] = {}
self.index_to_channel = {}
self.channel_masks: Dict[int, int] = {}
self.index_to_channel: Dict[int, int] = {}

for channel in self.channels:
if app_name:
# Get global channel index from application channel
hw_type, hw_index, hw_channel = self.get_application_config(
app_name, channel
)
LOG.debug("Channel index %d found", channel)
idx = self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
if idx < 0:
# Undocumented behavior! See issue #353.
# If hardware is unavailable, this function returns -1.
# Raise an exception as if the driver
# would have signalled XL_ERR_HW_NOT_PRESENT.
raise VectorInitializationError(
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name,
"xlGetChannelIndex",
)
else:
# Channel already given as global channel
idx = channel
mask = 1 << idx
self.channel_masks[channel] = mask
self.index_to_channel[idx] = channel
self.mask |= mask
channel_index = self._find_global_channel_idx(
channel=channel,
serial=serial,
app_name=app_name,
channel_configs=channel_configs,
)
LOG.debug("Channel index %d found", channel)

channel_mask = 1 << channel_index
self.channel_masks[channel] = channel_mask
self.index_to_channel[channel_index] = channel
self.mask |= channel_mask

permission_mask = xlclass.XLaccess()
# Set mask to request channel init permission if needed
if bitrate or fd:
permission_mask.value = self.mask
if fd:
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
permission_mask,
rx_queue_size,
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)
else:
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
permission_mask,
rx_queue_size,
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)

interface_version = (
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
if fd
else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION
)

self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
permission_mask,
rx_queue_size,
interface_version,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)

LOG.debug(
"Open Port: PortHandle: %d, PermissionMask: 0x%X",
self.port_handle.value,
permission_mask.value,
)

if permission_mask.value == self.mask:
if fd:
self.canFdConf = xlclass.XLcanFdConf()
if bitrate:
self.canFdConf.arbitrationBitRate = int(bitrate)
else:
self.canFdConf.arbitrationBitRate = 500000
self.canFdConf.sjwAbr = int(sjw_abr)
self.canFdConf.tseg1Abr = int(tseg1_abr)
self.canFdConf.tseg2Abr = int(tseg2_abr)
if data_bitrate:
self.canFdConf.dataBitRate = int(data_bitrate)
else:
self.canFdConf.dataBitRate = self.canFdConf.arbitrationBitRate
self.canFdConf.sjwDbr = int(sjw_dbr)
self.canFdConf.tseg1Dbr = int(tseg1_dbr)
self.canFdConf.tseg2Dbr = int(tseg2_dbr)

self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.mask, self.canFdConf
)
LOG.info(
"SetFdConfig.: ABaudr.=%u, DBaudr.=%u",
self.canFdConf.arbitrationBitRate,
self.canFdConf.dataBitRate,
)
LOG.info(
"SetFdConfig.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
self.canFdConf.sjwAbr,
self.canFdConf.tseg1Abr,
self.canFdConf.tseg2Abr,
)
LOG.info(
"SetFdConfig.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
self.canFdConf.sjwDbr,
self.canFdConf.tseg1Dbr,
self.canFdConf.tseg2Dbr,
)
else:
if bitrate:
self.xldriver.xlCanSetChannelBitrate(
self.port_handle, permission_mask, bitrate
for channel in self.channels:
if permission_mask.value & self.channel_masks[channel]:
if fd:
self._set_bitrate_canfd(
channel=channel,
bitrate=bitrate,
data_bitrate=data_bitrate,
sjw_abr=sjw_abr,
tseg1_abr=tseg1_abr,
tseg2_abr=tseg2_abr,
sjw_dbr=sjw_dbr,
tseg1_dbr=tseg1_dbr,
tseg2_dbr=tseg2_dbr,
)
LOG.info("SetChannelBitrate: baudr.=%u", bitrate)
else:
LOG.info("No init access!")
elif bitrate:
self._set_bitrate_can(channel=channel, bitrate=bitrate)

# Enable/disable TX receipts
tx_receipts = 1 if receive_own_messages else 0
Expand Down Expand Up @@ -348,6 +285,153 @@ def __init__(
self._is_filtered = False
super().__init__(channel=channel, can_filters=can_filters, **kwargs)

def _find_global_channel_idx(
self,
channel: int,
serial: Optional[int],
app_name: Optional[str],
channel_configs: List["VectorChannelConfig"],
) -> int:
if serial is not None:
hw_type: Optional[xldefine.XL_HardwareType] = None
for channel_config in channel_configs:
if channel_config.serialNumber != serial:
continue

hw_type = xldefine.XL_HardwareType(channel_config.hwType)
if channel_config.hwChannel == channel:
return channel_config.channelIndex

if hw_type is None:
err_msg = f"No interface with serial {serial} found."
else:
err_msg = f"Channel {channel} not found on interface {hw_type.name} ({serial})."
raise CanInitializationError(
err_msg, error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT
)

if app_name:
hw_type, hw_index, hw_channel = self.get_application_config(
app_name, channel
)
idx = cast(
int, self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
)
if idx < 0:
# Undocumented behavior! See issue #353.
# If hardware is unavailable, this function returns -1.
# Raise an exception as if the driver
# would have signalled XL_ERR_HW_NOT_PRESENT.
raise VectorInitializationError(
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name,
"xlGetChannelIndex",
)
return idx

# check if channel is a valid global channel index
for channel_config in channel_configs:
if channel == channel_config.channelIndex:
return channel

raise CanInitializationError(
f"Channel {channel} not found. The 'channel' parameter must be "
f"a valid global channel index if neither 'app_name' nor 'serial' were given.",
error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
)

def _set_bitrate_can(
self,
channel: int,
bitrate: int,
sjw: Optional[int] = None,
tseg1: Optional[int] = None,
tseg2: Optional[int] = None,
sam: int = 1,
) -> None:
kwargs = [sjw, tseg1, tseg2]
if any(kwargs) and not all(kwargs):
raise ValueError(
f"Either all of sjw, tseg1, tseg2 must be set or none of them."
)

# set parameters if channel has init access
if any(kwargs):
chip_params = xlclass.XLchipParams()
chip_params.bitRate = bitrate
chip_params.sjw = sjw
chip_params.tseg1 = tseg1
chip_params.tseg2 = tseg2
chip_params.sam = sam
self.xldriver.xlCanSetChannelParams(
self.port_handle,
self.channel_masks[channel],
chip_params,
)
LOG.info(
"xlCanSetChannelParams: baudr.=%u, sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
chip_params.bitRate,
chip_params.sjw,
chip_params.tseg1,
chip_params.tseg2,
)
else:
self.xldriver.xlCanSetChannelBitrate(
self.port_handle,
self.channel_masks[channel],
bitrate,
)
LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate)

def _set_bitrate_canfd(
self,
channel: int,
bitrate: Optional[int] = None,
data_bitrate: Optional[int] = None,
sjw_abr: int = 2,
tseg1_abr: int = 6,
tseg2_abr: int = 3,
sjw_dbr: int = 2,
tseg1_dbr: int = 6,
tseg2_dbr: int = 3,
) -> None:
# set parameters if channel has init access
canfd_conf = xlclass.XLcanFdConf()
if bitrate:
canfd_conf.arbitrationBitRate = int(bitrate)
else:
canfd_conf.arbitrationBitRate = 500_000
canfd_conf.sjwAbr = int(sjw_abr)
canfd_conf.tseg1Abr = int(tseg1_abr)
canfd_conf.tseg2Abr = int(tseg2_abr)
if data_bitrate:
canfd_conf.dataBitRate = int(data_bitrate)
else:
canfd_conf.dataBitRate = int(canfd_conf.arbitrationBitRate)
canfd_conf.sjwDbr = int(sjw_dbr)
canfd_conf.tseg1Dbr = int(tseg1_dbr)
canfd_conf.tseg2Dbr = int(tseg2_dbr)
self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.channel_masks[channel], canfd_conf
)
LOG.info(
"xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u",
canfd_conf.arbitrationBitRate,
canfd_conf.dataBitRate,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
canfd_conf.sjwAbr,
canfd_conf.tseg1Abr,
canfd_conf.tseg2Abr,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
canfd_conf.sjwDbr,
canfd_conf.tseg1Dbr,
canfd_conf.tseg2Dbr,
)

def _apply_filters(self, filters: Optional[CanFilters]) -> None:
if filters:
# Only up to one filter per ID type allowed
Expand Down Expand Up @@ -544,7 +628,7 @@ def _send_sequence(self, msgs: Sequence[Message]) -> int:

def _get_tx_channel_mask(self, msgs: Sequence[Message]) -> int:
if len(msgs) == 1:
return self.channel_masks.get(msgs[0].channel, self.mask)
return self.channel_masks.get(msgs[0].channel, self.mask) # type: ignore[arg-type]
else:
return self.mask

Expand Down

0 comments on commit 1e11f21

Please sign in to comment.