In [5]:
import serial.tools.list_ports as Ports
from typing import List
import serial
import re
import time
    

## Identifying connections and ports

In [161]:

ports = Ports.comports()
for port in ports:
    print(f"Port: {port.device}, Description: {port.description}")


Port: /dev/cu.BLTH, Description: n/a
Port: /dev/cu.Bluetooth-Incoming-Port, Description: n/a
Port: /dev/cu.usbmodemN32G45x1, Description: N32G45x Port  


## SIYI Datalink SDK

| **Field**   | **Index** | **Bytes** | **Description**                                                                                                                        |
|-------------|-----------|-----------|----------------------------------------------------------------------------------------------------------------------------------------|
| **STX**     | 0         | 2         | Starting mark, value: 0x5566                                                                                                           |
| **CTRL**    | 2         | 1         | Control byte with the following bits:                                                                                                  |
|             |           |           | - **bit 0**: `need_ack` (if the current package needs acknowledgment)                                                                  |
|             |           |           | - **bit 1**: `ack_pack` (if the package is an acknowledgment package)                                                                  |
|             |           |           | - **bits 2-7**: Reserved                                                                                                               |
| **Data_len**| 3         | 2         | Length of the data field in bytes. The low byte is first.                                                                             |
| **SEQ**     | 5         | 2         | Frame sequence number ranging from 0 to 65535. The low byte is first.                                                                 |
| **CMD_ID**  | 7         | 1         | Command identifier                                                                                                                     |
| **DATA**    | 8         | `Data_len`| The data payload. The length is determined by the `Data_len` field.                                                                    |
| **CRC16**   |   | 2    | CRC16 checksum for the entire data package.                                                                                           |


In [176]:
ser = serial.Serial(port= '/dev/cu.usbmodemN32G45x1',
                    baudrate= 57600,
                    timeout= 1
                    )
data2=  '55 66 01 01 00 00 00 42 02 B5 C0' # 2 HZ
# data0 = '55 66 01 01 00 00 00 42 00 F7 E0' # 0 HZ

if not ser.is_open:
    ser.open()

ser.write(bytes.fromhex(data2))

# res1 = ser.read(ser.in_waiting).hex()
res1 = ser.readline().hex()
# res1 = res1.hex()
print('Got the following response:\n', res1)

# ser.write(bytes.fromhex(data2))  
# res2 = ser.read(ser.in_waiting)
# print(ser.in_waiting)
# print(res2.hex())

# ser.close()


Got the following response:
 dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a047a7355660020002f5142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044f145566002000305142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04b82b5566002000315142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a048d4c5566002000325142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04d2e55566002000335142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04e7825566002000345142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044da75566002000355142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a0478c05566002000365142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a0427695566002000375142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04120e5566002000385142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a0473225566002000395142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e80

In [177]:
pattern = re.compile(r'5566.{0,80}', re.IGNORECASE)

# match = pattern.search(res1)

# match.group()
pattern.findall(res1)

['55660020002f5142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044f14',
 '5566002000305142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04b82b',
 '5566002000315142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a048d4c',
 '5566002000325142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04d2e5',
 '5566002000335142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04e782',
 '5566002000345142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044da7',
 '5566002000355142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a0478c0',
 '5566002000365142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a042769',
 '5566002000375142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04120e',
 '5566002000385142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a047322',
 '5566002000395142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044645',
 '55660020003a5142dc0

In [191]:
# Find '5566' followed by any two characters and capture the next four characters
pattern = re.compile(r'5566..(.{4})')

# Find all matches
matches = pattern.findall(res1)
matches

['2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 '2000',
 

In [205]:
data_len = 64

pattern_string = fr'5566.{{12}}.{{{data_len}}}.{{4}}'
pattern = re.compile(pattern_string)
matches = pattern.findall(res1)
matches

['55660020002f5142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044f14',
 '5566002000305142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04b82b',
 '5566002000315142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a048d4c',
 '5566002000325142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04d2e5',
 '5566002000335142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04e782',
 '5566002000345142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044da7',
 '5566002000355142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a0478c0',
 '5566002000365142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a042769',
 '5566002000375142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04120e',
 '5566002000385142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a047322',
 '5566002000395142dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a044645',
 '55660020003a5142dc0

In [189]:
byte_value = bytes.fromhex('0020')
decimal_value = int.from_bytes(byte_value)
print('Data length in bytes:',decimal_value)  
print('Data length in hex:',decimal_value *2)

Data length in bytes: 32
Data length in hex: 64


In [194]:
q = '2000'

q = q[2]+q[3]+q[0]+q[1]
q

'0020'

In [12]:
def main():

    ser = serial.Serial(port= '/dev/cu.usbmodemN32G45x1',
                    baudrate= 57600,
                    timeout= 1
                    )
    data2=  '55 66 01 01 00 00 00 42 02 B5 C0' # 2 HZ

    if not ser.is_open:
        ser.open()

    ser.write(bytes.fromhex(data2))

    # res1 = ser.read(ser.in_waiting).hex()
    res1 = ser.readline().hex()
    # res1 = res1.hex()
    print('Got the following response:\n', res1)

    data = data_extractor(res1)

    return data

In [7]:
def datalen_finder(ser : str) -> List[str]:
    ''' 
    This function extracts the initial-raw length from the stream of input data.
    '''

    # Find '5566' followed by any two characters and capture the next four characters
    pattern = re.compile(r'5566..(.{4})')

    # Find all matches
    matches = pattern.findall(ser)
    
    return matches

In [8]:
def datalen_modifier(len_hex : str) -> str:

    '''
    Data length in hex format, low byte (LSB) in front based on datasheet. 
    This function modifies this to MSB in front
    '''

    len = len_hex[2]+len_hex[3]+len_hex[0]+len_hex[1]

    return len

In [4]:
def datalen_determiner(revised_len : str) -> int:

    ''' 
    This function finds the data length in hex. 
    '''
    byte_value = bytes.fromhex('0020')
    decimal_value = int.from_bytes(byte_value)
    print('Data length in bytes:',decimal_value)  
    print('Data length in hex:',decimal_value *2)

    return decimal_value *2


In [10]:
def data_extractor(ser : str) -> str: 

    ''' 
    Extracts data from a sequence of serial hexadecimal characters
    '''

    for raw_len in datalen_finder (ser):

        revered = datalen_modifier (raw_len)
        data_len = datalen_determiner(revered)
        pattern_string = fr'5566.{{12}}.{{{data_len}}}.{{4}}'
        pattern = re.compile(pattern_string)
        matches = pattern.findall(res1)
        
        return matches

In [None]:
# Defining dataframe structure

start_frame = '5566'
crtl_len = 2
expected_data_len = 4
frame_seq_len = 4
mcd_id_len = 2
crc16_len = 4

In [156]:
import re
import serial

ser = serial.Serial(port= '/dev/cu.usbmodemN32G45x1',
                    baudrate= 57600,
                    timeout= 1
                    )

data2=  '55 66 01 01 00 00 00 42 02 B5 C0' # 2 HZ

pattern = re.compile(r'5566.*?b5c0', re.IGNORECASE)


def find_pattern_in_stream():
    buffer = ""
    
    ser.write(bytes.fromhex(data2))

    if not ser.is_open:
        ser.open()

    while True:
        
        data = ser.read(1000)

        if not data:
            continue
        
        hex_data = data.hex()
        
        buffer += hex_data
        
        match = pattern.search(buffer)

        if match:

            matched_data = match.group()
            print(f"Here is your pattern: {matched_data}")
            

try:
    find_pattern_in_stream()
except KeyboardInterrupt:
    print("Stopped by user.")
finally:
    ser.close()

Stopped by user.


In [178]:
len('dc05dc05dc05dc05dc051a041a04dc051a041a041a04dc05dc05e8031a041a04')

64

Should be = \
55 66 00 20 00 99 00 42 DC 05 DC 00 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 1A 04 DC 05 DC 05 1A 04 1A 04 FF 88

In [168]:
text = '55 66 00 20 00 99 00 42 DC 05 DC 00 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 DC 05 1A 04 DC 05 DC 05 1A 04 1A 04 FF 88'
text = text.replace(" ", "").lower()
datalen = len(text)
datalen

84

In [52]:

# Which port? what is the baud rate?
serial_port = '/dev/cu.usbmodemN32G45x1'
baud_rate = 9600

# Opening the serial port
ser = serial.Serial(port=serial_port, 
                    baudrate=baud_rate,
                    # stopbits=serial.STOPBITS_ONE,
                    timeout=1)

# Reading the data from the serial port continuously
# Error handling covered

if ser:

    # Initializing the communication with controller
    # ser.write(b'1')
    # time.sleep(0.1)

    # Reading the data from the serial port continuously
    # Error handling covered
    try:
        while True:
            if ser.in_waiting > 0:  # Is there some data waiting??
                data = ser.readline().decode('utf-8').strip() # Decoding bytes to string, strip whitespace
                print(data)
            time.sleep(0.1)  # Add a small delay to prevent high CPU usage

    # Keyboard interupt included
    except serial.SerialTimeoutException:
        print("Read operation timed out")

    except serial.SerialException as e:
        print(f"Serial error: {e}")

    except KeyboardInterrupt:
        print("Keyboard interrupt recieved")

    finally:
        if ser and ser.is_open:   
            ser.close()



Keyboard interrupt recieved


In [180]:
# Extracting different parts of the message
data = b'Uf\x01\x01\x00\x00\x00B\x02\xb5\xc0'

header = data[:2]
crtl = data[2]
flag = data[3]
length = int.from_bytes(data[4:7])  
command_id = data[7]
additional_flag = data[8]
checksum = data[9:11]

print("Header:", header.hex())
print("Flag:", flag)
print("Length:", length)
print("Command ID:", command_id)
print("Additional Flag:", additional_flag)
print("Checksum:", checksum.hex())


Header: 5566
Message Type: 6
Flag: 1
Length: 0
Command ID: 66
Additional Flag: 2
Checksum: b5c0
