/
B0602Lidar.py
171 lines (151 loc) · 6.45 KB
/
B0602Lidar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#import pyserial library
import serial
import time
import numpy as np
# Store 1 frame of data
class B0602Lidar:
def __init__(self, port='/dev/cu.SLAB_USBtoUART', baudrate=115200):
#def __init__(self, port='/dev/ttyUSB0', baudrate=115200):
'''Initilize B0602Lidar object for communicating with the sensor.
Parameters
----------
port : str
Serial port name to which sensor is connected
baudrate : int, optional
Baudrate for serial connection (the default is 115200)
'''
print("Initializing the B0602 Lidar")
print("Port:",port)
self.ser = None
self.port = ''
self.running = True
self.baudrate = 115200
self.angle = 0
self.signalStrength = 0
self.dist = 0
self.debug = False
self.single_scan = {}
# realted bit
self.flag = []
self.data = []
self.length = 0
self.data_length = 0
self.cmd = ''
self.cyc = 0B11000000000000101
self.zero_offset = 0
self.rpm = 0
self.port = port
self.baudrate = baudrate
self.ser = self._connect()
def _connect(self):
'''Connects to the serial port with the name `self.port`.'''
ser = serial.Serial()
ser.baudrate = 115200
ser.port = self.port
try:
ser.open()
if ser.is_open:
print("B0602 Lidar Connected")
except serial.serialutil.SerialException as err:
print(err)
exit(0)
return ser
def _bytes_to_int(self,input_bytes): # change byte to int
isinstance(input_bytes, bytes) or exit(99)
if (len(input_bytes) == 0): return 0
# (input_bytes[0] < 0x80) or exit (98)
shift = i1 = 0
for p in range(1, len(input_bytes) + 1):
i1 += (input_bytes[-p] << shift)
shift += 8
return i1
def run_thread(self):
return self.single_scan
def update(self):
while self.running:
self.run()
def run(self):
self.data.clear()
self.single_scan.clear()
while 1:
c = self.ser.read()
if self.debug: print(" ",c)
# wait till the start of the frame
if c == b'\xAA': # flag
cur = time.time()
if self.debug: print("Current time tick:", cur)
if self.debug: print("Package start")
temp = []
temp = self.ser.read(2)
if self.debug: print("Length bit:", format(temp[0], '02x'), format(temp[1], '02x'))
length = self._bytes_to_int(temp)
if self.debug: print("Length of packet:", length)
addr = self.ser.read()
if addr == b'\x00':
if self.debug: print("addr = 0x00")
else:
print("Addr not match: addr =",format(ord(addr), '02x'))
continue
type = self.ser.read()
if type == b'\x61':
if self.debug: print("type = 0x61")
else:
print("type not match: type =",type)
continue
cmd = self.ser.read()
if cmd == b'\xAD':
if self.debug: print("Command = 0xAD")
else:
if cmd == b'\xAE':
print("Command = \xAE")
print("Lidar Sensor Error")
break
else:
print("cmd not match: cmd =", format(cmd, '02x'))
data_length_byte = self.ser.read(2)
data_length = self._bytes_to_int(data_length_byte)
if self.debug: print("Expected data length =", data_length)
if length - data_length == 8:
if self.debug: print("bit length confirmed")
else:
print("bit length not match, length =",length - data_length)
break
rpm = self.ser.read()[0] * 0.05
if self.debug: print("(r/s) =", rpm)
zero_offset_bit = self.ser.read(2)
if self.debug: print("Zero offset bit:", format(zero_offset_bit[0], '02x'), format(zero_offset_bit[1], '02x'))
zero_offest = self._bytes_to_int(zero_offset_bit)
if zero_offest >= 2 ** 15:
zero_offest -= 2 ** 16
if self.debug: print("Zero offset(0.01degree) =", zero_offest)
if self.debug: print("Data Point in packet:",int((data_length - 5)/ 3))
data_pts = int((data_length - 5)/ 3)
angle_bit = self.ser.read(2)
if self.debug: print("Angle bit:", format(angle_bit[0], '02x'), format(angle_bit[1], '02x'))
start_angle = self._bytes_to_int(angle_bit)/100 + zero_offest
if start_angle > 360:
start_angle = start_angle - 360
if self.debug: print("Angle(0.01degree)", start_angle)
for i in range(1, data_pts+1):
if self.debug: print("Loop count:", i)
signalStrength_bit = self.ser.read()
if self.debug: print("signal Strength bit:", format(signalStrength_bit[0], '02x'))
signalStrength = ord(signalStrength_bit)
if self.debug: print("signal Strength:", format(signalStrength))
dist_bit = self.ser.read(2)
if self.debug: print("Dist bit:", format(dist_bit[0], '02x'), format(dist_bit[1], '02x'))
dist = self._bytes_to_int(dist_bit) * 0.25 # in mm
angle = start_angle + (22.5 * (i-1) / data_pts)
if self.debug: print("Distance (mm):", dist)
if self.debug: print("angle (0.01degree):", angle)
self.data.append([angle, dist])
end = time.time()
if self.debug: print("Time take for 1 packet", (end - cur) * 1000)
if self.debug: print("Data package:", self.data)
self.single_scan[start_angle] = self.data
self.single_scan['r/s'] = rpm
#print("Data:",self.single_scan)
#TODO: Add CRC check
crc = self.ser.read(2)
if self.debug: print("CRC check:",crc)
return self.single_scan