A python-can integration for the CANsub CAN bus interface family by CSS Electronics. Source on GitHub.
This package registers the CANsub as a standard python-can interface, making it compatible with all python-can tools and workflows. It also adds a CSV logger compatible with the webCAN browser tool provided with the device.
Tip: This README is optimized for LLMs. When using an AI coding assistant with this package, provide this file as context for accurate results.
pip install python-can-cansubWhen python-can-cansub is installed, the cansub interface is automatically registered with python-can. Import with:
import canPython-can defines a hardware configuration by an interface and a channel (a single interface can have multiple channels).
The CANsub interface is fixed "cansub". The channel is constructed from the device hostname (unique) and channel index.
| Connection | Hostname | python-can channel string |
|---|---|---|
| USB | [DEVICE-ID]-usb.local |
[DEVICE-ID]-usb.local@[channel] |
| Ethernet | [DEVICE-ID]-eth.local |
[DEVICE-ID]-eth.local@[channel] |
The device-ID is printed on the device label. Channel indexing is 1-based - the first channel is 1.
A configuration is passed to can.Bus to open a bus.
Example of a fixed configuration:
configs = [{"interface": "cansub", "channel": "aabbccdd-usb.local@1"},
{"interface": "cansub", "channel": "aabbccdd-usb.local@2"}]Example of using detect_available_configs to automatically discover (uses mDNS) all connected CANsub devices and channels:
configs = can.detect_available_configs(interfaces=["cansub"])
# e.g. [{"interface": "cansub", "channel": "aabbccdd-usb.local@1"},
# {"interface": "cansub", "channel": "aabbccdd-usb.local@2"}
# {"interface": "cansub", "channel": "11223344-eth.local@1"}
# {"interface": "cansub", "channel": "11223344-eth.local@2"}]In the above example two CANsub devices are detected, each with two channels. One device is connected via USB and the other via Ethernet.
with can.Bus(interface="cansub", channel="aabbccdd-usb.local@1", bitrate=250_000, data_bitrate=1_000_000) as bus:
passwith can.Bus(interface=configs[0]["interface"], channel=configs[0]["channel"], bitrate=250_000, data_bitrate=1_000_000) as bus:
passwith (can.Bus(interface=configs[0]["interface"], channel=configs[0]["channel"], bitrate=250_000, data_bitrate=1_000_000) as bus1,
can.Bus(interface=configs[1]["interface"], channel=configs[1]["channel"], bitrate=250_000, data_bitrate=1_000_000) as bus2):
passTip:
**configunpacks a config dict directly intocan.Buskeyword arguments:with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus: pass
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
# Transmit
msg_tx = can.Message(is_extended_id=False, arbitration_id=0x123, data=[0x01, 0x02, 0x03, 0x04])
bus.send(msg_tx)
# Receive with timeout
msg_rx = bus.recv(timeout=1.0)
print(msg_rx)Apply hardware filters by passing can_filters to can.Bus. Each filter specifies a can_id, a can_mask, and whether to match standard (extended=False) or extended (extended=True) frames. A frame passes if (frame_id & can_mask) == (can_id & can_mask).
filters = [
{"can_id": 0x123, "can_mask": 0x7FF, "extended": False}, # standard frames, exact ID match
{"can_id": 0x000, "can_mask": 0x000, "extended": True}, # all extended frames
]
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000, can_filters=filters) as bus:
msg = bus.recv(timeout=1.0)
print(msg)Tip: Applying hardware filters reduces the network load between the CANsub and the connected client.
bus.recv() blocks until a frame arrives. A can.Notifier runs a background thread that dispatches received frames to one or more listeners, allowing the main program to continue other work.
python-can provides built-in listeners including can.Printer (print to stdout) and can.Logger (log to file). The example below logs to a CSV file while the main program continues. Custom listeners can be implemented by subclassing can.Listener.
from time import sleep
print_listener = can.Printer()
csv_listener = can.Logger("log.csv")
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with can.Notifier([bus], listeners=[print_listener, csv_listener]):
# Perform other tasks here while frames are received in the background
sleep(10)Periodic transmission jobs can be started with bus.send_periodic().
Most periodic transmission job types can be offloaded to the CANsub hardware, providing much better transmission time accuracy (compared to a host-scheduled transmission). A host-side background task is used only as a fallback when hardware transmission is not available.
from time import sleep
msgs = [
can.Message(is_extended_id=False, arbitration_id=0x123, data=[0x01, 0x02, 0x03, 0x04]),
can.Message(is_extended_id=False, arbitration_id=0x124, data=[0x05, 0x06, 0x07, 0x08]),
can.Message(is_extended_id=False, arbitration_id=0x125, data=[0x09, 0x0A, 0x0B, 0x0C]),
]
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
# period: time between individual frames (sequence repeats every len(msgs) * period)
# duration: total transmission time in seconds (None = transmit indefinitely)
task = bus.send_periodic(msgs, period=0.1, duration=5.0)
# Perform other tasks here while frames are transmitted in the background
sleep(6)can.MessageSync can be used to replay messages from a log file.
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with can.LogReader("log.csv") as reader:
for msg in can.MessageSync(messages=reader):
bus.send(msg)python-can includes several command line tools. All tools accept --interface and --channel to select the bus, following the same configuration as the API.
The common argument pattern for the CANsub:
--interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000
Log received frames to a file (CSV by default; format inferred from file extension):
can_logger --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000 --output-file log.csvPlay back a previously recorded log file:
can_player --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000 log.csvLive terminal viewer showing received frames, updated counts, timestamps, and byte-level changes:
can_viewer --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000Forward all frames received on one bus to another (e.g. bridge two CANsub channels):
can_bridge --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000 \
--interface2 cansub --channel2 aabbccdd-usb.local@2 --bitrate2 250000 --data-bitrate2 1000000Convert a log file between formats; the format is inferred from the file extension:
can_logconvert log.csv log.ascThe following packages complement python-can-cansub and are included here as inspiration for working with CAN data in Python.
cantools is a Python package for encoding and decoding CAN messages. Encoding/decoding rules can be created or loaded from DBC (and other) database files. It works directly with can.Message objects from python-can.
pip install cantoolsA database can be constructed directly in Python without a database file:
import cantools
db = cantools.database.Database()
msg_def = cantools.database.can.Message(
frame_id=0x123,
name="Message1",
length=8,
signals=[
cantools.database.can.Signal(name="Signal1", start=0, length=16, scale=0.1, offset=0.0, minimum=0.0, maximum=100.0),
cantools.database.can.Signal(name="Signal2", start=16, length=16, scale=0.1, offset=0.0, minimum=0.0, maximum=100.0),
]
)
db.add_message(msg_def)import cantools
db = cantools.database.load_file("database.dbc")
msg_def = db.get_message_by_name("Message1")Encode signal values into the byte payload of a can.Message:
data = msg_def.encode({"Signal1": 1.0, "Signal2": 42.5})
msg_tx = can.Message(arbitration_id=msg_def.frame_id,
is_extended_id=msg_def.is_extended_frame,
data=data)
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
bus.send(msg_tx)Decode the byte payload of a received can.Message back into signal values:
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
msg_rx = bus.recv(timeout=1.0)
if msg_rx:
signals = db.decode_message(msg_rx.arbitration_id, msg_rx.data)
print(signals) # e.g. {'Signal1': 1.0, 'Signal2': 42.5}asammdf is a Python package for reading and writing MDF (Measurement Data Format) files.
When asammdf is installed, python-can automatically gains support for reading MDF log files via can.LogReader, allowing MDF recordings to be played back directly using can.MessageSync:
pip install asammdfwith can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with can.LogReader("recording.mf4") as reader:
for msg in can.MessageSync(messages=reader):
bus.send(msg)