In [1]:
import time
import serial
import struct
import matplotlib.pyplot as plt
import IPython
import threading

In [2]:
def cobs_encode(buf):
    if len(buf) == 0:
        return b"\x00"
    if buf[0] != 0:
        buf = b"\x00" + buf
        return cobs_encode(buf)
    # buf[0] must be 0 here
    assert buf[0] == 0
    a, sep, b = buf[1:].partition(b"\x00")
    return bytes([len(a)+1])+a+cobs_encode(sep + b)

In [3]:
def cobs_decode(buf):
    if buf == b"":
        return b""
    if buf == b"\x00":
        return b""
    out = bytearray(buf[0])
    out[1:] = buf[1:len(out)]
    return out + cobs_decode(buf[len(out):])

In [4]:
def get_imu_reading(buf):
    if buf[:3] != "IMU":
        return None
    buf = buf[3:]
    if len(buf) != 12:
        return None
    imur = struct.unpack("<hhhhhh", buf)
    return imur

In [5]:
class COBSSerial:
    def __init__(self, port):
        self.port = serial.Serial(None, 115200, timeout=1)
        self.port.port = port
        self.subscribers = {
            "IMU": get_imu_reading,
        }
        self.port_lock = threading.RLock()
        self.read_thread = None
        self.stop = False
    def begin(self):
        self.port.open()
        time.sleep(0.1)
        with self.port_lock:
            self.port.write(range(64)) # Get out of bootloader
        if self.read_thread is None:
            self.read_thread = threading.Thread(
            target=self.read_thread_fun)
        self.read_thread.start()
    def end(self):
        self.stop = True
        self.read_thread.join()
        self.read_thread = None
        self.port.close()
    def get_packet(self):
        with self.port_lock:
            buf = self.port.read_until(b'\x00')
        buf = cobs_decode(buf)
        return buf[1:]
    def read_thread_fun(self):
        while not self.stop:
            buf = self.get_packet()
            tag = buf[:3].decode("cp437")
            if tag in self.subscribers:
                if "__iter__" not in dir(self.subscribers[tag]):
                    # Just call the function itself
                    self.subscribers[tag](buf)
                else:
                    for f in self.subscribers[tag]:
                        f(buf)
    def write(self, tag, payload_bytes):
        tag = tag.encode("cp437")[:3]
        payload = tag + payload_bytes
        buf = cobs_encode(payload)
        with self.port_lock:
            self.port.write(buf)
            self.port.flush()

In [6]:
cs = COBSSerial("COM4")
cs.begin()
start = time.monotonic()
while time.monotonic() - start < 10:
    cs.write("OUT", struct.pack("<h", int(1000+(time.monotonic() - start)*100))*8)
    time.sleep(0.05)
cs.end()