In [1]:
from typing import List
import serial
import time
import numpy as np

class Ring:
    def __init__(self, port = 'COM3', baud = 115200, timeout = 0.1):

        self._serial = serial.Serial(port, baud, timeout = timeout)
        self._buf = bytearray(b'')
        self._msg = bytearray(b'')
        self._range_datatype_len = 2
        self._clock_datatype_len = 4

        self.measurements = [[]]

    def close(self):
        self._serial.close()


    def process(self) -> List[List[float]]:

        self.measurements = [[]]

        self._buf += self._serial.read(self._serial.in_waiting)
        
        while ((len(self._buf)) >= 3):
            if ((self._buf[0] == 0xFF) and (self._buf[1] == 0xFF)):
                # btarr[0] and btarr[1] is 0xFF starting bytes
                packet_length = self._buf[2] + 2
                if (len(self._buf) >= packet_length):

                    if (len(self.measurements[0]) == 0):
                        msg_num = 0
                        total_len = len(self._buf)

                    self._msg = self._buf[0:packet_length]
                    self._buf = self._buf[packet_length:]
                    sensors_num = self._msg[3]
                    data_length = self._range_datatype_len * sensors_num
                    
                    if (data_length > packet_length):
                        print("Too long message! Skip measurement.")
                        self._buf = self._buf[data_length:]
                    else:
                        crc = 0
                        for i in range(2, data_length + self._clock_datatype_len + 3):
                            crc = (crc + self._msg[i]) & 0xFF
                        crc = ~(crc & 0xFF) & 0xFF

                        if (crc == (self._msg[-1] & 0xFF)):
                            
                            if (len(self.measurements[0]) == 0):
                                self.measurements = [[0]*(sensors_num + 1) for _ in range((total_len // packet_length))]

                            ranges = [0] * sensors_num

                            for i in range(sensors_num):
                                for j in range(self._range_datatype_len):
                                    ranges[i] += float(self._msg[4+2*i+j] << 8*j)

                            clock = 0.0
                            for i in range(self._clock_datatype_len):
                                clock += float(self._msg[4+data_length+i] << 8*i)

                            for i in range(sensors_num):
                                self.measurements[msg_num][i] = ranges[i]

                            self.measurements[msg_num][-1] = clock
                            msg_num += 1
                            #print("fine message: " + f'Packet length {packet_length}, sensors_num {sensors_num}, ranges = {ranges[0]}, {ranges[1]}; clock_ms = {clock}') 
                     
                        else:
                            print("Corrupted message! Skip measurement.")
                            self._buf = self._buf[(data_length + self._clock_datatype_len + 3):]
                else:
                   return self.measurements

            else:
                self._buf = self._buf[1:]
            
        return self.measurements

    def readMeasurements(self) -> List[int]:
        pass

In [2]:
import threading


class RepeatTimer(threading.Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)
            
            
class WoodChecker:
    def __init__(self, maximum_distance: float, rate: float, right_serial: Ring, left_serial: Ring):
        self._right_serial = right_serial
        self._left_serial = left_serial
        self.scans_buffer = []
        self.timer = RepeatTimer(rate, self.check_scan)
        
    def check_scan(self):
        left_scan = self._left_serial.process()[-1]
        right_scan = self._right_serial.process()[-1]
        invited_scans = self.invite_scans(left_scan, right_scan)
        converted_scans = self.convert_to_meters(invited_scans)
        if len(converted_scans) == 24:
            print(converted_scans)
            self.scans_buffer.append(converted_scans)
        
    def invite_scans(self, left_scan, right_scan):
        return left_scan[:-1] + right_scan[:-1]
    
    def convert_to_meters(self, scans):
        return [item / 1000 for item in scans]
        
    @property
    def measurements(self):
        return self.scans_buffer
    
    def start_scanning(self):
        self.timer.start()
        
    def stop_scanning(self):
        self.timer.cancel()

In [3]:
import typing as tp
import math


def calculate_volume(
    points_scans: list,
    l: tp.Union[float, list],
    alpha: float = 360 / 24,
    is_radians: bool = False
) -> float:
    """
    points: array [plane, scans]
    l: length between planes
    alpha: angle between sensors (default: 30)
    is_radians: if alpha provided in radians (default: False)
    """
    volume = 0.0
    if not is_radians:
        alpha *= math.pi / 180  # to radians
    if isinstance(l, (int, float)):
        l = [l] * len(points_scans)

    plane_num = 0
    while plane_num < len(points_scans) - 1:
        sensor_num = -1
        while sensor_num < len(points_scans[plane_num]) - 1:
            r1_o = points_scans[plane_num][sensor_num]
            r1_d = points_scans[plane_num + 1][sensor_num]
            r2_o = points_scans[plane_num][sensor_num + 1]
            r2_d = points_scans[plane_num + 1][sensor_num + 1]

            volume += (
                l[plane_num] * (
                    2 * r1_o * r1_o +
                    2 * r1_d * r1_d +
                    2 * r2_o * r2_o +
                    2 * r2_d * r2_d +
                    2 * r1_o * r2_o +
                    2 * r1_o * r1_d +
                    2 * r1_d * r2_d +
                    2 * r2_o * r2_d +
                    r1_o * r2_d +
                    r1_d * r2_o
                )
            )

            sensor_num += 1
        plane_num += 1

    volume *= alpha / 36
    return volume

In [4]:
# Change your usb name here (ex. /dev/ttyUSB0) /dev/ttyACM1 COM6
right_half_master = Ring(port = 'COM6', baud = 115200, timeout = 0.1)
left_half_slave = Ring(port = 'COM7', baud = 115200, timeout = 0.1)

In [5]:
checker = WoodChecker(1, 0.1, right_half_master, left_half_slave)

In [None]:
left_half_slave.process()

In [None]:
right_half_master.process()

In [None]:
checker.start_scanning()
time.sleep(2.0)
checker.stop_scanning()
print(calculate_volume(checker.measurements, 1))

In [None]:
calculate_volume(checker.measurements, 1)

In [None]:
a = [1, 2, 3, 4]
print([i / 10 for i in a])

In [None]:
print(right_half_master.process())

In [None]:
print(left_half_slave.process())