# python-can CAN over serial

[```python-can```](https://python-can.readthedocs.io/en/2.1.0/index.html) includes a [CAN over Serial](
https://python-can.readthedocs.io/en/2.1.0/interfaces/serial.html
) module.

This allows prototyping and testing with inexpensive Arduino devices before switching to another module with minimal code changes.

The packet over serial



|                | Length (Byte) | Data type               | Byte order    | Description                                   |
|----------------|---------------|-------------------------|---------------|-----------------------------------------------|
| Start of frame | 1             | Byte                    | -             | Must be 0xAA                                  |
| Timestamp      | 4             | Unsigned 4 byte integer | Little-Endian | Usually s, ms or µs since start of the device |
| DLC            | 1             | Unsigned 1 byte integer | Little-Endian | Length in byte of the payload                 |
| Arbitration ID | 4             | Unsigned 4 byte integer | Little-Endian | -                                             |
| Payload        | 0 - 8         | Byte                    | -             | -                                             |
| End of frame   | 1             | Byte                    | -             | Must be 0xBB                                  |

In [11]:
# Imports for build.
import glob
import uuid
import jinja2
import os
import re
import time
import datetime

In [12]:
checkpoints=list()

In [14]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T20:38:38.706772+00:00


In [15]:
old_sketches = glob.glob("*.ino")
old_sketches

[]

In [16]:
for old_sketch in old_sketches:
    print("Deleting: {}".format(old_sketch))
    os.unlink(old_sketch)

## Universally unique identifier

> From Wikipedia, the free encyclopedia
A universally unique identifier (UUID) is a 128-bit number used to identify information in computer systems. The term globally unique identifier (GUID) is also used.

> When generated according to the standard methods, UUIDs are for practical purposes unique, without depending for their uniqueness on a central registration authority or coordination between the parties generating them, unlike most other numbering schemes. While the probability that a UUID will be duplicated is not zero, it is close enough to zero to be negligible.

> Thus, anyone can create a UUID and use it to identify something with near certainty that the identifier does not duplicate one that has already been, or will be, created to identify something else. **Information labeled with UUIDs by independent parties can therefore be later combined into a single database, or transmitted on the same channel, without needing to resolve conflicts between identifiers**.

> Adoption of UUIDs and GUIDs is widespread, with many computing platforms providing support for generating them, and for parsing their textual representation.

- https://en.wikipedia.org/wiki/Universally_unique_identifier

In [88]:
build_uuid = str(uuid.uuid4())

In [89]:
arduino_template_str = """
unsigned long BAUD = {{ baud }};
unsigned long DELAY = {{ delay }};
char BUILD_UUID[] = "{{ build_uuid }}";

unsigned long arbitration_id = 0x0;
unsigned long time_millis = 0b0;
unsigned long timestamp = 0x0;

uint8_t counter8 = 0x00;
uint16_t counter16 = 0x0000;
uint32_t counter32 = 0x00000000;

// Arduino Setup
void setup() {
Serial.begin(BAUD);
Serial.print("Build UUID: ");
Serial.println(BUILD_UUID);

// Start of frame
Serial.write(0xAA);

// Timestamp
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);

// DLC
Serial.write(0x00);

// Arbitration ID
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);

// End of frame
Serial.write(0xBB);
return; 
}

// Arduino Main Loop
void loop() {

/* Begin timestamp data packet */
time_millis = millis();
// Start of frame
Serial.write(0xAA);

// Timestamp
timestamp = time_millis;
for(char shift=0;shift<32;shift+=8) {
Serial.write((unsigned long)timestamp>>shift&0b11111111);
}

// DLC
Serial.write(0x04);
// Arbitration ID
arbitration_id=1;
for(char shift=0;shift<32;shift+=8) {
Serial.write((unsigned long)arbitration_id>>shift&0xFF);
}

// Payload
for(char shift=0;shift<32;shift+=8) {
Serial.write((unsigned long)time_millis>>shift&0b11111111);
}

// End of Frame
Serial.write(0xBB);
/* End timestamp data packet */
}
"""
arduino_template = jinja2.Template(arduino_template_str)

In [90]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T21:39:51.676242+00:00


In [49]:
sketch_cfg=dict()
sketch_cfg["baud"] = 115200
sketch_cfg["delay"]= 1000
sketch_cfg["arbitration_id"]=12
sketch_cfg["build_uuid"]=build_uuid

arduino_sketch = (arduino_template.render(**sketch_cfg))
print(arduino_sketch)
sketch_file = "{}.ino".format(build_uuid)
with open(sketch_file, "w") as fid:
    fid.write(arduino_sketch)


unsigned long BAUD = 115200;
unsigned long DELAY = 1000;
char BUILD_UUID[] = "b042c137-95d3-4935-b1a1-15d12e992d1c";

unsigned long arbitration_id = 0x0;
unsigned long time_millis = 0b0;
unsigned long timestamp = 0x0;

uint8_t counter8 = 0x00;
uint16_t counter16 = 0x0000;
uint32_t counter32 = 0x00000000;

// Arduino Setup
void setup() {
Serial.begin(BAUD);
Serial.print("Build UUID: ");
Serial.println(BUILD_UUID);

// Start of frame
Serial.write(0xAA);

// Timestamp
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);

// DLC
Serial.write(0x00);

// Arbitration ID
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);
Serial.write(0x00);

// End of frame
Serial.write(0xBB);
return; 
}

// Arduino Main Loop
void loop() {

/* Begin timestamp data packet */
time_millis = millis();
// Start of frame
Serial.write(0xAA);

// Timestamp
timestamp = time_millis;
for(char shift=0;shift<32;shift+=8) {
Serial.write((unsigned long)timestamp>>shift&0b11111111);
}

// 

## Traceability

In [52]:
import hashlib

Hash of the source code, add to blockchains for proof-of-existence.

Immutable store if investigators want to find exactly when a flash file was built.

In [82]:
for algo in hashlib.algorithms_guaranteed:
    sketch_hash = getattr(hashlib, algo)(arduino_sketch.encode("UTF-8"))
    try:
        print("{}: {}".format(algo, sketch_hash.hexdigest()))
    except:
        pass

sha1: 6497a50ddaa294434d678e4dfd6a07a4fc88c1c4
sha3_384: cab792aa40fda66fef0a6248ce592fa0e5106a6ca832668a88ada8b7073ad897c420e93ede96f5823a00b3bb510556f0
sha3_512: b4c30bf92779fee855d084a97d50ec0ccacd3822dd1c2ee81d14c9387f8b2b5a1e7386076f5729c0de3fdac8cda9338f00b555f806d80201bb3c0c515b2a8a27
sha3_256: 7219b8737c492a9c6a45fc8ca720bb7b3a59629510b484add82842f992d28175
blake2b: 38cb52fa1f50561aa181ea5222ee2f3b84db27ddc9f87817c6dc9b6f8191e8315cb558b5f0b85ebcab90ea121e52d13e88ea56508d295138f1871014a7cded5d
sha384: e3918f0aa0b2014a69b21b2c2004f9df03cde4e0c831084dcbb1781fe58531e47f1f2896bfa26782f14a60a45bf3b2c4
sha224: 4228c36febd8cf740383e729812da9407a3ce90efb1d06ab383d4088
sha256: 286ffc985581e4ca3976ee581569bb1196985a57d139e717ace65bbda2dca4ce
sha512: 4b8e69e6160a9f5c56ea3aa77d7ec08cae47635206dca96b5e71de302b9dbd93835e59e70a9225d9047bdc6896f67278484df4929af94a2317af7c8083be340a
sha3_224: 09c7b0088deb5b77b7e0db3c42cd8e2d28ecb85faaad15b266bcbd25
blake2s: eb2c3a1b9d575c66f851e45ceb99aecee966a6

In [84]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T21:03:21.222163+00:00


## Formatting.

Format the sketch to adhere to LLVM style guidelines.

In [87]:
import subprocess
subprocess.check_output(["clang-format-6.0", "-style=LLVM", "-i", sketch_file])

b''

In [86]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T21:03:23.010261+00:00


# Build Process

## Build & Upload Flash File

In [33]:
import serial
import time

In [34]:
try:
    stdout = subprocess.check_output(["make", "upload"]).decode()
    print(stdout)
except subprocess.CalledProcessError as err:
    print("Build Failed:")
    print(err)
except:
    raise

-------------------------
Arduino.mk Configuration:
- [AUTODETECTED]       CURRENT_OS = LINUX 
- [AUTODETECTED]       ARDUINO_DIR = /usr/share/arduino 
- [COMPUTED]           ARDMK_DIR = /projects/arduino_IO/arduino_make (relative to Common.mk)
- [AUTODETECTED]       ARDUINO_VERSION = 105 
- [DEFAULT]            ARCHITECTURE =  
- [DEFAULT]            ARDMK_VENDOR = arduino 
- [AUTODETECTED]       ARDUINO_PREFERENCES_PATH = /mnt/ubuntu1604_2/home/jed/.arduino/preferences.txt 
- [AUTODETECTED]       ARDUINO_SKETCHBOOK = /mnt/ubuntu1604_2/home/jed/sketchbook (from arduino preferences file)
- [BUNDLED]            AVR_TOOLS_DIR = /usr/share/arduino/hardware/tools/avr (in Arduino distribution)
- [COMPUTED]           ARDUINO_LIB_PATH = /usr/share/arduino/libraries (from ARDUINO_DIR)
- [COMPUTED]           ARDUINO_VAR_PATH = /usr/share/arduino/hardware/arduino//variants (from ARDUINO_DIR)
- [COMPUTED]           BOARDS_TXT = /usr/share/arduino/hardware/arduino//boards.txt (from ARDUINO_DIR)
- 

In [35]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T20:41:34.977254+00:00


## Build Process Tests

Make sure that all serial ports from the build and upload process are the same.

In [36]:
serial_ports = re.compile("(/dev/ttyUSB[\d]+)").findall(stdout)
serial_ports

['/dev/ttyUSB2', '/dev/ttyUSB2', '/dev/ttyUSB2']

In [38]:
for i in range(len(serial_ports)-1):
    assert (serial_ports[i] == serial_ports[i+1])

In [39]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T20:42:11.011021+00:00


# Serial V&V Testing

In [40]:
import serial
import time

In [41]:
ser = serial.Serial(
    port=serial_ports[0], baudrate=sketch_cfg["baud"],
    bytesize=serial.EIGHTBITS,
    parity=serial.PARITY_NONE,
    stopbits=serial.STOPBITS_ONE,
    timeout=10,
    xonxoff=0,
    rtscts=0
)

In [43]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

2018-05-16T20:43:32.487003+00:00


### Build & Flash UUID verification

Assert that the UUID generated from the build script is the same one that is in the flash file.

In [44]:
ser.setDTR(False) # Drop DTR
time.sleep(1)   # Read somewhere that 22ms is what the UI does.
ser.flushInput()
ser.flushOutput()
ser.flush()
ser.setDTR(True)  # UP the DTR back
header = ser.readline()
header

b'\xcaE\x00\x00\x04\x01\x00\x00\x00\xcaE\x00\x00\xfbBuild UUID: b042c137-95d3-4935-b1a1-15d12e992d1c\r\n'

In [45]:
header_uuid = header.split(b"UUID:")[1].strip().decode("UTF-8")

In [46]:
build_uuid

'b042c137-95d3-4935-b1a1-15d12e992d1c'

In [47]:
header_uuid

'b042c137-95d3-4935-b1a1-15d12e992d1c'

In [48]:
assert build_uuid == header_uuid

In [None]:
now=datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
checkpoints.append(now)
print(now)

In [None]:
ser.setDTR(False) # Drop DTR
time.sleep(.25)   # Read somewhere that 22ms is what the UI does.
ser.flushInput()
ser.flushOutput()
ser.flush()
ser.setDTR(True)  # UP the DTR back
time.sleep(2.5)
header_packet = ser.readline()
header_packet

In [None]:
ser.read_until(b"\xAA")
for x in range(10):
    packet = ser.read_until(b"\xBB")
    print(packet)

In [None]:
packet = ser.read_until(b"\xBB")
packet

In [None]:
packet = ser.read_until(b"\xBB")
packet

In [None]:
packet[1:5]

In [None]:
timestamp_raw = packet[1:5]
timestamp_raw

In [None]:
int.from_bytes(timestamp_raw, 'little')

In [None]:
int.from_bytes(timestamp_raw, 'big')

In [None]:
timestamp = int.from_bytes(timestamp_raw, 'little')
timestamp

In [None]:
sof = packet[0]
sof

In [None]:
int.from_bytes(b'\xAA', 'little')

In [None]:
int.from_bytes(b'\xAA', 'big')

In [None]:
assert sof == int.from_bytes(b'\xAA', 'little')

In [None]:
dlc = packet[5]
dlc

In [None]:
arbitration_id_raw = packet[6:10]
arbitration_id_raw

In [None]:
int.from_bytes(arbitration_id_raw, 'little')

In [None]:
int.from_bytes(arbitration_id_raw, 'big')

In [None]:
data = packet[11]
data 

In [None]:
sketch_cfg

In [None]:
header_uuid

In [None]:
sketch_cfg["build_uuid"]

In [None]:
assert str(header_uuid) == header_uuid

In [None]:
assert str(sketch_cfg["build_uuid"]) == sketch_cfg["build_uuid"]

In [None]:
assert header_uuid == sketch_cfg["build_uuid"]

In [None]:
ser.close()

In [None]:
import can
can.__version__

In [None]:
bus_cfg = dict()
bus_cfg["bustype"] = "serial"
bus_cfg["channel"]=serial_ports[0]
bus_cfg["bitrate"]=115200

bus = can.interface.Bus(**bus_cfg)   
bus.ser.read_until(b"\xBB")

In [None]:
packet = bus.recv()
packet

In [None]:
packet.timestamp

In [None]:
packet.arbitration_id

In [None]:
packet.dlc

In [None]:
packet.data

In [None]:
int.from_bytes(packet.data, 'little')

In [None]:
int.from_bytes(packet.data, 'big')

In [None]:
packet.timestamp

In [None]:
packets = list()
while len(packets)<100:
    packet = bus.recv()
    if packet is None:
        time.sleep(sketch_cfg["delay"])
    packets.append(packet)

In [None]:
packets

In [None]:
class Data(object):
    def __init__(self):
        pass
data = Data()

In [None]:
data.time = [packet.timestamp for packet in packets]
data.time[:10]

In [None]:
data.values = [int.from_bytes(packet.data, 'little') for packet in packets]
data.values[:10]