Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 101 additions & 21 deletions can/interfaces/socketcan/socketcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,84 @@ def get_addr(sock, channel):
return struct.pack("HiLL", AF_CAN, idx, 0, 0)


# Setup BCM struct
def bcm_header_factory(fields, alignment=8):
curr_stride = 0
results = []
pad_index = 0
for field in fields:
field_alignment = ctypes.alignment(field[1])
field_size = ctypes.sizeof(field[1])

# If the current stride index isn't a multiple of the alignment
# requirements of this field, then we must add padding bytes until we
# are aligned
while curr_stride % field_alignment != 0:
results.append(("pad_{}".format(pad_index), ctypes.c_uint8))
pad_index += 1
curr_stride += 1

# Now can it fit?
# Example: If this is 8 bytes and the type requires 4 bytes alignment
# then we can only fit when we're starting at 0. Otherwise, we will
# split across 2 strides.
#
# | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
results.append(field)
curr_stride += field_size

# Add trailing padding to align to a multiple of the largest scalar member
# in the structure
while curr_stride % alignment != 0:
results.append(("pad_{}".format(pad_index), ctypes.c_uint8))
pad_index += 1
curr_stride += 1

return type("BcmMsgHead", (ctypes.Structure,), {"_fields_": results})

# The fields definition is taken from the C struct definitions in
# <linux/can/bcm.h>
#
# struct bcm_timeval {
# long tv_sec;
# long tv_usec;
# };
#
# /**
# * struct bcm_msg_head - head of messages to/from the broadcast manager
# * @opcode: opcode, see enum below.
# * @flags: special flags, see below.
# * @count: number of frames to send before changing interval.
# * @ival1: interval for the first @count frames.
# * @ival2: interval for the following frames.
# * @can_id: CAN ID of frames to be sent or received.
# * @nframes: number of frames appended to the message head.
# * @frames: array of CAN frames.
# */
# struct bcm_msg_head {
# __u32 opcode;
# __u32 flags;
# __u32 count;
# struct bcm_timeval ival1, ival2;
# canid_t can_id;
# __u32 nframes;
# struct can_frame frames[0];
# };
BcmMsgHead = bcm_header_factory(
fields=[
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
]
)


# struct module defines a binary packing format:
# https://docs.python.org/3/library/struct.html#struct-format-strings
# The 32bit can id is directly followed by the 8bit data link count
Expand Down Expand Up @@ -118,27 +196,29 @@ def build_can_frame(msg):
return CAN_FRAME_HEADER_STRUCT.pack(can_id, msg.dlc, flags) + data


def build_bcm_header(opcode, flags, count, ival1_seconds, ival1_usec, ival2_seconds, ival2_usec, can_id, nframes):
# == Must use native not standard types for packing ==
# struct bcm_msg_head {
# __u32 opcode; -> I
# __u32 flags; -> I
# __u32 count; -> I
# struct timeval ival1, ival2; -> llll ...
# canid_t can_id; -> I
# __u32 nframes; -> I
bcm_cmd_msg_fmt = "@3I4l2I0q"

return struct.pack(bcm_cmd_msg_fmt,
opcode,
flags,
count,
ival1_seconds,
ival1_usec,
ival2_seconds,
ival2_usec,
can_id,
nframes)
def build_bcm_header(
opcode,
flags,
count,
ival1_seconds,
ival1_usec,
ival2_seconds,
ival2_usec,
can_id,
nframes,
):
result = BcmMsgHead(
opcode=opcode,
flags=flags,
count=count,
ival1_tv_sec=ival1_seconds,
ival1_tv_usec=ival1_usec,
ival2_tv_sec=ival2_seconds,
ival2_tv_usec=ival2_usec,
can_id=can_id,
nframes=nframes,
)
return ctypes.string_at(ctypes.addressof(result), ctypes.sizeof(result))


def build_bcm_tx_delete_header(can_id, flags):
Expand Down
234 changes: 234 additions & 0 deletions test/test_socketcan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
"""
Test functions in `can.interfaces.socketcan.socketcan`.
"""
import unittest

try:
from unittest.mock import Mock
from unittest.mock import patch
from unittest.mock import call
except ImportError:
from mock import Mock
from mock import patch
from mock import call

import ctypes

from can.interfaces.socketcan.socketcan import bcm_header_factory


class SocketCANTest(unittest.TestCase):
def setUp(self):
self._ctypes_sizeof = ctypes.sizeof
self._ctypes_alignment = ctypes.alignment

@patch("ctypes.sizeof")
@patch("ctypes.alignment")
def test_bcm_header_factory_32_bit_sizeof_long_4_alignof_long_4(
self, ctypes_sizeof, ctypes_alignment
):
"""This tests a 32-bit platform (ex. Debian Stretch on i386), where:

* sizeof(long) == 4
* sizeof(long long) == 8
* alignof(long) == 4
* alignof(long long) == 4
"""

def side_effect_ctypes_sizeof(value):
type_to_size = {
ctypes.c_longlong: 8,
ctypes.c_long: 4,
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_uint64: 8,
}
return type_to_size[value]

def side_effect_ctypes_alignment(value):
type_to_alignment = {
ctypes.c_longlong: 4,
ctypes.c_long: 4,
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_uint64: 4,
}
return type_to_alignment[value]

ctypes_sizeof.side_effect = side_effect_ctypes_sizeof
ctypes_alignment.side_effect = side_effect_ctypes_alignment

fields = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
]
BcmMsgHead = bcm_header_factory(fields)

expected_fields = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
# We expect 4 bytes of padding
("pad_0", ctypes.c_uint8),
("pad_1", ctypes.c_uint8),
("pad_2", ctypes.c_uint8),
("pad_3", ctypes.c_uint8),
]
self.assertEqual(expected_fields, BcmMsgHead._fields_)

@patch("ctypes.sizeof")
@patch("ctypes.alignment")
def test_bcm_header_factory_32_bit_sizeof_long_4_alignof_long_8(
self, ctypes_sizeof, ctypes_alignment
):
"""This tests a 32-bit platform (ex. Raspbian Stretch on armv7l), where:

* sizeof(long) == 4
* sizeof(long long) == 8
* alignof(long) == 4
* alignof(long long) == 8
"""

def side_effect_ctypes_sizeof(value):
type_to_size = {
ctypes.c_longlong: 8,
ctypes.c_long: 4,
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_uint64: 8,
}
return type_to_size[value]

def side_effect_ctypes_alignment(value):
type_to_alignment = {
ctypes.c_longlong: 8,
ctypes.c_long: 4,
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_uint64: 8,
}
return type_to_alignment[value]

ctypes_sizeof.side_effect = side_effect_ctypes_sizeof
ctypes_alignment.side_effect = side_effect_ctypes_alignment

fields = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
]
BcmMsgHead = bcm_header_factory(fields)

expected_fields = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
# We expect 4 bytes of padding
("pad_0", ctypes.c_uint8),
("pad_1", ctypes.c_uint8),
("pad_2", ctypes.c_uint8),
("pad_3", ctypes.c_uint8),
]
self.assertEqual(expected_fields, BcmMsgHead._fields_)

@patch("ctypes.sizeof")
@patch("ctypes.alignment")
def test_bcm_header_factory_64_bit_sizeof_long_4_alignof_long_4(
self, ctypes_sizeof, ctypes_alignment
):
"""This tests a 64-bit platform (ex. Ubuntu 18.04 on x86_64), where:

* sizeof(long) == 8
* sizeof(long long) == 8
* alignof(long) == 8
* alignof(long long) == 8
"""

def side_effect_ctypes_sizeof(value):
type_to_size = {
ctypes.c_longlong: 8,
ctypes.c_long: 8,
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_uint64: 8,
}
return type_to_size[value]

def side_effect_ctypes_alignment(value):
type_to_alignment = {
ctypes.c_longlong: 8,
ctypes.c_long: 8,
ctypes.c_uint8: 1,
ctypes.c_uint16: 2,
ctypes.c_uint32: 4,
ctypes.c_uint64: 8,
}
return type_to_alignment[value]

ctypes_sizeof.side_effect = side_effect_ctypes_sizeof
ctypes_alignment.side_effect = side_effect_ctypes_alignment

fields = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
]
BcmMsgHead = bcm_header_factory(fields)

expected_fields = [
("opcode", ctypes.c_uint32),
("flags", ctypes.c_uint32),
("count", ctypes.c_uint32),
# We expect 4 bytes of padding
("pad_0", ctypes.c_uint8),
("pad_1", ctypes.c_uint8),
("pad_2", ctypes.c_uint8),
("pad_3", ctypes.c_uint8),
("ival1_tv_sec", ctypes.c_long),
("ival1_tv_usec", ctypes.c_long),
("ival2_tv_sec", ctypes.c_long),
("ival2_tv_usec", ctypes.c_long),
("can_id", ctypes.c_uint32),
("nframes", ctypes.c_uint32),
]
self.assertEqual(expected_fields, BcmMsgHead._fields_)


if __name__ == "__main__":
unittest.main()