Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions can/interfaces/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def __new__(cls, other, channel=None, *args, **kwargs):
if can.rc['interface'] == 'socketcan':
can.rc['interface'] = choose_socketcan_implementation()

# Update can.rc from kwargs
for kw in ('interface', 'bitrate'):
if kw in kwargs:
can.rc[kw] = kwargs[kw]

if 'interface' not in can.rc or 'channel' not in can.rc or can.rc['interface'] is None:
can.log.debug("Loading default configuration")
# Load defaults
Expand Down
73 changes: 64 additions & 9 deletions can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import binascii
import ctypes
import functools
import logging
import sys
import time
Expand All @@ -16,9 +17,10 @@

from can.ctypesutil import CLibrary, HANDLE, PHANDLE

from .constants import VCI_MAX_ERRSTRLEN
from .exceptions import *

__all__ = ["VCITimeout", "VCIError", "VCIDeviceNotFoundError", "IXXATBus"]
__all__ = ["VCITimeout", "VCIError", "VCIDeviceNotFoundError", "IXXATBus", "vciFormatError"]

log = logging.getLogger('can.ixxat')

Expand All @@ -27,6 +29,9 @@
else:
_timer_function = time.clock

# Hack to have vciFormatError as a free function, see below
vciFormatError = None

# main ctypes instance
if sys.platform == "win32":
try:
Expand All @@ -40,18 +45,69 @@
log.warning("IXXAT VCI library does not work on %s platform", sys.platform)
_canlib = None

def __vciFormatErrorExtended(library_instance, function, HRESULT, arguments):
""" Format a VCI error and attach failed function, decoded HRESULT and arguments
:param CLibrary library_instance:
Mapped instance of IXXAT vcinpl library
:param callable function:
Failed function
:param HRESULT HRESULT:
HRESULT returned by vcinpl call
:param arguments:
Arbitrary arguments tuple
:return:
Formatted string
"""
#TODO: make sure we don't generate another exception
return "{} - arguments were {}".format(
__vciFormatError(library_instance, function, HRESULT),
arguments
)

def __vciFormatError(library_instance, function, HRESULT):
""" Format a VCI error and attach failed function and decoded HRESULT
:param CLibrary library_instance:
Mapped instance of IXXAT vcinpl library
:param callable function:
Failed function
:param HRESULT HRESULT:
HRESULT returned by vcinpl call
:return:
Formatted string
"""
buf = ctypes.create_string_buffer(VCI_MAX_ERRSTRLEN)
ctypes.memset(buf, 0, VCI_MAX_ERRSTRLEN)
library_instance.vciFormatError(HRESULT, buf, VCI_MAX_ERRSTRLEN)
return "function {} failed ({})".format(function._name, buf.value.decode('utf-8', 'replace'))

def __check_status(result, function, arguments):
""" Check the result of a vcinpl function call and raise appropriate exception
in case of an error. Used as errcheck function when mapping C functions
with ctypes.
:param result:
Function call numeric result
:param callable function:
Called function
:param arguments:
Arbitrary arguments tuple
:raise:
:class:VCITimeout
:class:VCIRxQueueEmptyError
:class:StopIteration
:class:VCIError
"""
if isinstance(result, int):
# Real return value is an unsigned long
result = ctypes.c_ulong(result).value

if (result == constants.VCI_E_TIMEOUT):
raise VCITimeout("Function {} timed out".format(function._name))
elif (result == constants.VCI_E_RXQUEUE_EMPTY):
raise VCIRxQueueEmptyError()
elif (result == constants.VCI_E_NO_MORE_ITEMS):
raise StopIteration()
elif (result != constants.VCI_OK):
raise VCIError(function, result, arguments)
raise VCIError(vciFormatError(function, result))

return result

Expand All @@ -62,6 +118,8 @@ def __check_status(result, function, arguments):

#void VCIAPI vciFormatError (HRESULT hrError, PCHAR pszText, UINT32 dwsize);
_canlib.map_symbol("vciFormatError", None, (ctypes.HRESULT, ctypes.c_char_p, ctypes.c_uint32))
# Hack to have vciFormatError as a free function
vciFormatError = functools.partial(__vciFormatError, _canlib)

# HRESULT VCIAPI vciEnumDeviceOpen( OUT PHANDLE hEnum );
_canlib.map_symbol("vciEnumDeviceOpen", ctypes.c_long, (PHANDLE,), __check_status)
Expand Down Expand Up @@ -290,12 +348,12 @@ def _inWaiting(self):
return 1

def flush_tx_buffer(self):
" Flushes the transmit buffer on the IXXAT "
""" Flushes the transmit buffer on the IXXAT """
# TODO: no timeout?
_canlib.canChannelWaitTxEvent(self._channel_handle, constants.INFINITE)

def recv(self, timeout=None):
" Read a message from IXXAT device. "
""" Read a message from IXXAT device. """

# TODO: handling CAN error messages?
if (timeout is None):
Expand All @@ -308,11 +366,8 @@ def recv(self, timeout=None):
_canlib.canChannelPeekMessage(self._channel_handle, ctypes.byref(self._message))
except VCITimeout:
return None
except VCIError as e:
if (e.HRESULT == constants.VCI_E_RXQUEUE_EMPTY):
return None
else:
raise e
except VCIRxQueueEmptyError:
return None
else:
if (self._message.uMsgInfo.Bits.type == constants.CAN_MSGTYPE_DATA):
tm = _timer_function()
Expand Down
36 changes: 10 additions & 26 deletions can/interfaces/ixxat/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,24 @@
Copyright (C) 2016 Giuseppe Corbelli <giuseppe.corbelli@weightpack.com>
"""

import ctypes

from . import constants

from can import CanError

__all__ = ['VCITimeout', 'VCIError', 'VCIDeviceNotFoundError']
__all__ = ['VCITimeout', 'VCIError', 'VCIRxQueueEmptyError', 'VCIDeviceNotFoundError']

class VCITimeout(CanError):
""" Wraps the VCI_E_TIMEOUT error """
pass


class VCIError(CanError):
" Try to display errors that occur within the wrapped C library nicely. "

_ERROR_BUFFER = ctypes.create_string_buffer(constants.VCI_MAX_ERRSTRLEN)

def __init__(self, function, HRESULT, arguments):
super(VCIError, self).__init__()
self.HRESULT = HRESULT
self.function = function
self.arguments = arguments

def __str__(self):
return "function {} failed - {} - arguments were {}".format(
self.function.__name__,
self.__get_error_message(),
self.arguments
)

def __get_error_message(self):
ctypes.memset(self._ERROR_BUFFER, 0, constants.VCI_MAX_ERRSTRLEN)
vciFormatError(self.HRESULT, self._ERROR_BUFFER, constants.VCI_MAX_ERRSTRLEN)
return "{}".format(self._ERROR_BUFFER)
""" Try to display errors that occur within the wrapped C library nicely. """
pass


class VCIRxQueueEmptyError(VCIError):
""" Wraps the VCI_E_RXQUEUE_EMPTY error """
def __init__(self):
super(VCIRxQueueEmptyError, self).__init__("Receive queue is empty")


class VCIDeviceNotFoundError(CanError):
Expand Down
2 changes: 1 addition & 1 deletion doc/history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ The pcan interface was contributed by Albert Bloomfield in 2013.
The usb2can interface was contributed by Joshua Villyard in 2015

The IXXAT VCI interface was contributed by Giuseppe Corbelli and funded
by Weightpack in 2016
by `Weightpack <http://www.weightpack.com>`__ in 2016


Support for CAN within Python
Expand Down