In [1]:
import serial
import numpy as np
from crc_table import CRC16_Table, CRC8_Table
from imu_data_struct import imu_frame_read, ahrs_frame_read, insgps_frame_read

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu, MagneticField
from std_msgs.msg import String
from geometry_msgs.msg import Pose2D
from tf2_ros import transform_broadcaster



FRAME_HEAD = 0xfc # 252
FRAME_END = 0xfd # 253
TYPE_IMU = 0x40 # 64
TYPE_AHRS = 0x41 # 65
TYPE_INSGPS = 0x42 # 66
TYPE_GROUND = 0xf0 # 240
IMU_LEN = 0x38 #56
AHRS_LEN = 0x30 #48
INSGPS_LEN = 0x54 #84
IMU_MAG_COV = [0.01, 0.01, 0.01]
IMU_GYRO_COV = [0.01, 0.01, 0.01]
IMU_ACCEL_COV = [0.05, 0.05, 0.05]

class IMU_NODE(Node):
    def __init__(self, serial_port:str='/dev/ttyACM0', serial_boudrate:int=921600, device_type:int=1, frist_sn:bool = False, serial_timeout = 20, mag_offset_x = 0, mag_offset_y = 0, mag_offset_z = 0, mag_covariance = 0, debug:bool = False):
        rclpy.init(args=None)
        super().__init__('imu_node')
        self.serial_port_ = serial_port
        self.serial_baudrate_ = serial_boudrate
        self.device_type_ = device_type
        self.frist_sn_ = frist_sn
        self.serial_timeout = serial_timeout
        self.mag_offset_x_ = mag_offset_x
        self.mag_offset_y_ = mag_offset_y
        self.mag_offset_z_ = mag_offset_z
        self.mag_covariance_ = mag_covariance
        self.debug_ = debug

        self.read_sn_ = 0
        self.sn_lost_ = 0
        self.crc_error = 0

        self.imu_frame_ = imu_frame_read()
        self.ahrs_frame_ = ahrs_frame_read()
        self.insgps_frame_ = ahrs_frame_read()

        
        self.imu_pub_ = self.create_publisher(Imu, 'imu', 10)
        self.imu_trueEast_pub_ = self.create_publisher(Imu, 'imu_trueEast', 10)
        self.mag_pose_pub_ = self.create_publisher(Pose2D, 'mag_pose_2d', 10)
        self.mag_pub_ = self.create_publisher(MagneticField, 'magetic_field', 10)

        self.ser = None

        try:
            self.ser = serial.Serial(port=self.serial_port_, baudrate=self.serial_baudrate_)
            if self.ser.is_open: print("serial port initialized")
        except:
            print("Unable to open serial port")


    def checkSN(self, type:int):
        # IMU
        if type == TYPE_IMU:
            self.read_sn_ +=1
            if self.read_sn_ != self.imu_frame_.frame.header.serial_num:
                if self.imu_frame_.frame.header.serial_num < self.read_sn_:
                    self.sn_lost_ += 256 - int(self.read_sn_ - self.imu_frame_.frame.header.serial_num)
                    if self.debug_: print("detected sn lost")
                else:
                    self.sn_lost_ += int(self.imu_frame_.frame.header.serial_num - self.read_sn_)
                    if self.debug_: print("detected sn lost")
            self.read_sn_ = self.imu_frame_.frame.header.serial_num
        # AHRS
        elif type == TYPE_AHRS:
            self.read_sn_ += 1
            if self.read_sn_ != self.ahrs_frame_.frame.header.serial_num:
                if self.ahrs_frame_.frame.header.serial_num < self.read_sn_:
                    self.sn_lost_ += 256 - int(self.read_sn_ - self.ahrs_frame_.frame.header.serial_num)
                    if self.debug_: print("detected sn lost")
                else:
                    self.sn_lost_ += int(self.ahrs_frame_.frame.header.serial_num - self.read_sn_)
                    if self.debug_: print("detect sn lost")
            self.read_sn_ = self.ahrs_frame_.frame.header.serial_num
        # INSGPS
        elif type == TYPE_INSGPS:
            self.read_sn_ += 1
            if self.read_sn_ != self.insgps_frame_.frame.header.serial_num:
                if self.insgps_frame_.frame.header.serial_num < self.read_sn_:
                    self.sn_lost_ += 256 - int(self.read_sn_ - self.insgps_frame_.frame.header.serial_num)
                    if self.debug_: print("detected sn lost")
                else:
                    self.sn_lost_ += int(self.insgps_frame_.frame.header.serial_num - self.read_sn_)
                    if self.debug_: print("detect sn lost")
            self.read_sn_ = self.insgps_frame_.frame.header.serial_num


    def read_serial(self, n_bytes:int=1):
        if n_bytes == 1:
            return int(self.ser.read(1).hex(),16)
        else:
            return list(self.ser.read(n_bytes))

    def processLoop(self):
        print("processLoop is starting")
        while self.ser.is_open:
            check_head = self.read_serial(1)
            # cheack head start
            if check_head != FRAME_HEAD:
                continue
            # check head type
            head_type = self.read_serial(1)
            if head_type != TYPE_IMU and head_type != TYPE_AHRS and head_type != TYPE_INSGPS and head_type != 0x50 and head_type != TYPE_GROUND:
                if self.debug_: print(f"Warning: Head type error: {head_type}")
                continue # Head type error
            # check head length
            check_len = self.read_serial(1)
            if head_type == TYPE_IMU and check_len != IMU_LEN:
                if self.debug_: print("warning: Head length error (imu)")
                continue
            elif head_type == TYPE_AHRS and check_len != AHRS_LEN:
                if self.debug_: print("warning: Head length error (ahrs)")
                continue
            elif head_type == TYPE_INSGPS and check_len != INSGPS_LEN:
                if self.debug_: print("warning: Head length error (insgps)")
                continue
            elif head_type == TYPE_GROUND or head_type == 0x50: # Unknown data, prevent record failure
                ground_sn = self.read_serial(1)
                ground_sn = ground_sn
                self.read_sn_ += 1
                if self.read_sn_ != ground_sn:
                    if ground_sn < self.read_sn_:
                        if self.debug_: print("warning: detect sn lost")
                        self.sn_lost_ += 256 - int(self.read_sn_ - ground_sn)
                        self.read_sn_ = ground_sn
                    else:
                        if self.debug_: print("warning: detect sn lost")
                        self.sn_lost_ += int(ground_sn - self.read_sn_)
                        self.read_sn_ = ground_sn
                ground_ignore =  self.read_serial(check_len+4) 
                continue

            # read head sn
            check_sn = self.read_serial(1)
            head_crc8 = self.read_serial(1)
            head_crc16_H = self.read_serial(1)
            head_crc16_L = self.read_serial(1)
            if self.debug_: 
                print("check_sn: ", check_sn)
                print("head_crc8: ", head_crc8)
                print("head_crc16_H: ", head_crc16_H)
                print("head_crc16_L: ", head_crc16_L)
            # -------------------------------------------------------------   
            # put header and check crc8 and count sn lost
            # CRC8 
            if head_type == TYPE_IMU:
                # print("-----TYPE_IMU----------")
                self.imu_frame_.frame.header.header_start = check_head
                self.imu_frame_.frame.header.data_type = head_type
                self.imu_frame_.frame.header.data_size = check_len
                self.imu_frame_.frame.header.serial_num = check_sn
                self.imu_frame_.frame.header.header_crc8 = head_crc8
                self.imu_frame_.frame.header.header_crc16_h = head_crc16_H
                self.imu_frame_.frame.header.header_crc16_l = head_crc16_L
                CRC8 = CRC8_Table(self.imu_frame_.read_buf.frame_header, 4)
                if CRC8 != self.imu_frame_.frame.header.header_crc8:
                    print("Warning: header_crc8_error (imu)")
                    continue
                if not self.frist_sn_:
                    self.read_sn_ = self.imu_frame_.frame.header.serial_num - 1
                    self.frist_sn_ = True
                # check sn
                self.checkSN(TYPE_IMU)
            elif head_type == TYPE_AHRS:
                # print("-----TYPE_AHRS----------")
                self.ahrs_frame_.frame.header.header_start = check_head
                self.ahrs_frame_.frame.header.data_type = head_type
                self.ahrs_frame_.frame.header.data_size = check_len
                self.ahrs_frame_.frame.header.serial_num = check_sn
                self.ahrs_frame_.frame.header.header_crc8 = head_crc8
                self.ahrs_frame_.frame.header.header_crc16_h = head_crc16_H
                self.ahrs_frame_.frame.header.header_crc16_l = head_crc16_L
                CRC8 = CRC8_Table(self.ahrs_frame_.read_buf.frame_header, 4)
                if CRC8 != self.ahrs_frame_.frame.header.header_crc8:
                    print("Warning: header_crc8_error (ahrs)")
                    continue
                if not self.frist_sn_:
                    self.read_sn_ = self.ahrs_frame_.frame.header.serial_num - 1
                    self.frist_sn_ = True
                # check sn
                self.checkSN(TYPE_AHRS)
            elif head_type == TYPE_INSGPS:
                # print("-----TYPE_INSGPS----------")
                self.insgps_frame_.frame.header.header_start = check_head
                self.insgps_frame_.frame.header.data_type = head_type
                self.insgps_frame_.frame.header.data_size = check_len
                self.insgps_frame_.frame.header.serial_num = check_sn
                self.insgps_frame_.frame.header.header_crc8 = head_crc8
                self.insgps_frame_.frame.header.header_crc16_h = head_crc16_H
                self.insgps_frame_.frame.header.header_crc16_l = head_crc16_L
                CRC8 = CRC8_Table(self.insgps_frame_.read_buf.frame_header, 4)
                if CRC8 != self.insgps_frame_.frame.header.header_crc8:
                    print("Warning: header_crc8_error (insgps)")
                    continue
                elif self.debug_:
                    print("header_crc8 matched")
                # check sn
                self.checkSN(TYPE_AHRS)
            # -------------------------------------------------------------------------
            print("CRC8 passed")
            # 2. CRC16
            if head_type == TYPE_IMU:
                head_crc16_l = self.imu_frame_.frame.header.header_crc16_l
                head_crc16_h = self.imu_frame_.frame.header.header_crc16_h
                head_crc16 = head_crc16_l + (head_crc16_h << 8)
                for i in range(IMU_LEN+1): # 56+1
                    self.imu_frame_.read_buf.read_msg[i] = self.read_serial(1)
                    print(self.imu_frame_.read_buf.read_msg[i])
                CRC16 = CRC16_Table(self.imu_frame_.frame.data.data_buff, IMU_LEN)
                print("imu data buf: ", self.imu_frame_.frame.data.data_buff)
                if self.debug_:
                    print("CRC16: ", CRC16)
                    print("head_crc16: ", head_crc16)
                    print("head_crc16_h: ", head_crc16_h)
                    print("head_crc16_l: ", head_crc16_l)
                    if_right = (int(head_crc16) == int(CRC16))
                    print("if_right: ", if_right)
                if head_crc16 != CRC16:
                    print("check crc16 failed (imu)")
                    continue
                elif(self.imu_frame_.frame.frame_end != FRAME_END):
                    print("check frame end.")
                    continue
            elif head_type == TYPE_AHRS:
                head_crc16_l = self.ahrs_frame_.frame.header.header_crc16_l
                head_crc16_h = self.ahrs_frame_.frame.header.header_crc16_h
                head_crc16 = head_crc16_l + (head_crc16_h << 8)
                for i in range(AHRS_LEN+1): # 48+1
                    self.ahrs_frame_.read_buf.read_msg[i] = self.read_serial(1)
                CRC16 = CRC16_Table(self.ahrs_frame_.frame.data.data_buf, AHRS_LEN)
                if self.debug_:
                    print("CRC16: ", CRC16)
                    print("head_crc16: ", head_crc16)
                    print("head_crc16_h: ", head_crc16_h)
                    print("head_crc16_l: ", head_crc16_l)
                    if_right = (int(head_crc16) == int(CRC16))
                    print("if_right: ", if_right)
                if head_crc16 != CRC16:
                    print("check crc16 failed (ahrs)")
                    continue
                elif self.ahrs_frame_.frame.frame_end != FRAME_END:
                    print("check frame end.")
                    continue
            elif head_type == TYPE_INSGPS:
                head_crc16 = self.insgps_frame_.frame.header.header_crc16_l + int(self.insgps_frame_.frame.header.header_crc16_h << 8)
                for i in range(INSGPS_LEN+1): # 48+1
                    self.insgps_frame_.read_buf.read_msg[i] = self.read_serial(1)
                CRC16 = CRC16_Table(self.ahrs_frame_.frame.data.data_buf, AHRS_LEN)
                if head_crc16 != CRC16:
                    print("check crc16 failed (insgps)")
                    continue
                elif self.ahrs_frame_.frame.frame_end != FRAME_END:
                    print("check frame end.")
                    continue
            print("PASS")
            # publish magyaw topic

                

   


    

In [2]:
node = IMU_NODE() # debug=True

serial port initialized


In [3]:
node.processLoop()

processLoop is starting
CRC8 passed
16
20
241
185
28
216
7
187
224
127
161
57
157
78
156
61
96
3
222
188
20
217
27
193
0
0
40
67
0
128
196
195
0
0
144
66
92
63
28
66
128
230
197
71
92
63
28
66
107
180
128
79
2
0
0
0
253
imu data buf:  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
check crc16 failed (imu)
CRC8 passed
check crc16 failed (ahrs)
CRC8 passed
144
254
147
185
72
168
77
187
224
127
161
57
56
178
145
61
94
209
151
188
198
231
27
193
0
128
38
67
0
192
201
195
0
0
147
66
0
64
28
66
128
230
197
71
0
64
28
66
75
254
186
79
2
0
0
0
253
imu data buf:  [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
check crc16 failed (imu)
CRC8 passed
check crc16 failed (ahrs)
CRC8 passed
check crc16 failed (ahrs)


KeyboardInterrupt: 

In [4]:
node.destroy_node()
rclpy.shutdown

<function rclpy.shutdown(*, context: rclpy.context.Context = None) -> None>

In [7]:
node.read_serial(3)

[132, 189, 94]

In [16]:
d = b'\xfc\x01'
list(d)

[252, 1]

In [4]:
a = [0]*10
a[0] = 1
a

[1, 0, 0, 0, 0, 0, 0, 0, 0, 0]

ValueError: invalid literal for int() with base 16: b'\xfc'