This notebook is trying to figure out CAN connections via the Python-CAN library and the interface library for INNOMAKER.

In [2]:
import serial.tools.list_ports

ports = serial.tools.list_ports.comports()
for port in ports:
    print(port.device)

COM3


In [3]:
#!/usr/bin/env python

"""
This example shows how sending a single message works.
"""

import can
import time

def send_one(com_port):
    """Sends a single message."""
    # this uses the default configuration (for example from the config file)
    # see https://python-can.readthedocs.io/en/stable/configuration.html
        # Using specific buses works similar:
    bus = can.interface.Bus(interface='seeedstudio',
                            channel=com_port,
                            baudrate=2000000,
                            bitrate=500000)
    time.sleep(0.1)  

    msg = can.Message(
        arbitration_id=0xC0FFEE, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True
    )

    try:
        bus.send(msg)
        print(f"Message sent on {bus.channel_info}")
        time.sleep(1)  # wait for 1 second before sending the next message
    except can.CanError:
        print("Message NOT sent")

    task = bus.send_periodic(msg, 0.001)
    assert isinstance(task, can.CyclicSendTaskABC)

    # finally:
    #     bus.shutdown()
    #     print("Bus shutdown successfully")
        
# Call the function with the correct COM port
send_one('COM3')

Message sent on Serial interface: COM3


In [4]:
# import the library
import can

# create a bus instance using 'with' statement,
# this will cause bus.shutdown() to be called on the block exit;
# many other interfaces are supported as well (see documentation)
with can.Bus(interface='seeedstudio',
            channel='COM3',
            baudrate=2000000,
            bitrate=500000) as bus:

   # send a message
   message = can.Message(arbitration_id=123, is_extended_id=True,
                         data=[0x11, 0x22, 0x33])
   bus.send(message, timeout=0.2)

   # iterate over received messages
   for msg in bus:
       print(f"{msg.arbitration_id:X}: {msg.data}")

   # or use an asynchronous notifier
   notifier = can.Notifier(bus, [can.Logger("recorded.log"), can.Printer()])

SerialException: could not open port 'COM3': PermissionError(13, 'Access is denied.', None, 5)

In [8]:
#!/usr/bin/env python

"""
This example exercises the periodic sending capabilities.

Expects a vcan0 interface:

    python3 -m examples.cyclic

"""

import logging
import time

import can

logging.basicConfig(level=logging.INFO)


def simple_periodic_send(bus):
    """
    Sends a message every 20ms with no explicit timeout
    Sleeps for 2 seconds then stops the task.
    """
    print("Starting to send a message every 200ms for 2s")
    msg = can.Message(
        arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6], is_extended_id=False
    )
    task = bus.send_periodic(msg, 0.001)
    assert isinstance(task, can.CyclicSendTaskABC)
    time.sleep(60)
    task.stop()
    print("stopped cyclic send")


def limited_periodic_send(bus):
    """Send using LimitedDurationCyclicSendTaskABC."""
    print("Starting to send a message every 200ms for 1s")
    msg = can.Message(
        arbitration_id=0x12345678, data=[0, 0, 0, 0, 0, 0], is_extended_id=True
    )
    task = bus.send_periodic(msg, 0.20, 1, store_task=False)
    if not isinstance(task, can.LimitedDurationCyclicSendTaskABC):
        print("This interface doesn't seem to support LimitedDurationCyclicSendTaskABC")
        task.stop()
        return

    time.sleep(2)
    print("Cyclic send should have stopped as duration expired")
    # Note the (finished) task will still be tracked by the Bus
    # unless we pass `store_task=False` to bus.send_periodic
    # alternatively calling stop removes the task from the bus
    # task.stop()


def test_periodic_send_with_modifying_data(bus):
    """Send using ModifiableCyclicTaskABC."""
    print("Starting to send a message every 200ms. Initial data is four consecutive 1s")
    msg = can.Message(arbitration_id=0x0CF02200, data=[1, 1, 1, 1])
    task = bus.send_periodic(msg, 0.20)
    if not isinstance(task, can.ModifiableCyclicTaskABC):
        print("This interface doesn't seem to support modification")
        task.stop()
        return
    time.sleep(2)
    print("Changing data of running task to begin with 99")
    msg.data[0] = 0x99
    task.modify_data(msg)
    time.sleep(2)

    task.stop()
    print("stopped cyclic send")
    print("Changing data of stopped task to single ff byte")
    msg.data = bytearray([0xFF])
    msg.dlc = 1
    task.modify_data(msg)
    time.sleep(1)
    print("starting again")
    task.start()
    time.sleep(1)
    task.stop()
    print("done")


# Will have to consider how to expose items like this. The socketcan
# interfaces will continue to support it... but the top level api won't.
# def test_dual_rate_periodic_send():
#     """Send a message 10 times at 1ms intervals, then continue to send every 500ms"""
#     msg = can.Message(arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5])
#     print("Creating cyclic task to send message 10 times at 1ms, then every 500ms")
#     task = can.interface.MultiRateCyclicSendTask('vcan0', msg, 10, 0.001, 0.50)
#     time.sleep(2)
#
#     print("Changing data[0] = 0x42")
#     msg.data[0] = 0x42
#     task.modify_data(msg)
#     time.sleep(2)
#
#     task.stop()
#     print("stopped cyclic send")
#
#     time.sleep(2)
#
#     task.start()
#     print("starting again")
#     time.sleep(2)
#     task.stop()
#     print("done")


def main():
    """Test different cyclic sending tasks."""
    reset_msg = can.Message(
        arbitration_id=0x00, data=[0, 0, 0, 0, 0, 0], is_extended_id=False
    )

    # this uses the default configuration (for example from environment variables, or a
    # config file) see https://python-can.readthedocs.io/en/stable/configuration.html
    bus = can.interface.Bus(interface='seeedstudio',
                            channel='COM3',
                            baudrate=2000000,
                            bitrate=500000)
    bus.send(reset_msg)

    simple_periodic_send(bus)

    bus.send(reset_msg)

    limited_periodic_send(bus)

    test_periodic_send_with_modifying_data(bus)

    # print("Carrying out multirate cyclic test for {} interface".format(interface))
    # can.rc['interface'] = interface
    # test_dual_rate_periodic_send()

    time.sleep(2)


if __name__ == "__main__":
    main()

Starting to send a message every 200ms for 2s
stopped cyclic send
Starting to send a message every 200ms for 1s
Cyclic send should have stopped as duration expired
Starting to send a message every 200ms. Initial data is four consecutive 1s
Changing data of running task to begin with 99
stopped cyclic send
Changing data of stopped task to single ff byte
starting again
done
