In [1]:
# 埼玉大学データサイエンス技術研究会 
# 第6回研究会 (2024/6/14) サンプルコード
# 作成者: 平松 薫
#
# 05_acc_2.py: 最新の加速度データを取得するプログラム #
# 
# 使い方: python 05_acc_2.py
#

import os
import sys
import time
from datetime import datetime
from struct import pack, unpack
import serial

# シリアルポートの設定
SERIAL_PORT = "COM3"
SERIAL_BAUDRATE = 115200

In [9]:
def calc_crc(buf, length):
    """
    CRC-16 を計算する関数

    """
    crc = 0xFFFF
    for i in range(length):
        crc = crc ^ buf[i]
        for i in range(8):
            carrayFlag = crc & 1
            crc = crc >> 1
            if (carrayFlag == 1):
                crc = crc ^ 0xA001
    crcH = crc >> 8
    crcL = crc & 0x00FF
    return (bytearray([crcL, crcH]))

def s16(value):
    return -(value & 0x8000) | (value & 0x7fff)

def serial_write(_ser, _payload):
    """
    シリアルポートにコマンドを送信する関数
    _payload の前にヘッダと_payloadの長さを付加、後に CRC-16 を付加して送信
    """
    _command = b'\x52\x42' + pack('<H', len(_payload) + 2) + _payload
    _command = _command + calc_crc(_command, len(_command))
    _ser.write(_command)
    _ser.flush()
    time.sleep(0.1)
    return

def serial_read(_ser, _payload, timeout=0.0):
    """
    環境センサにコマンドを送信し、レスポンスを取得する関数
    _payload の前にヘッダと_payloadの長さを付加、後に CRC-16 を付加して送信
    レスポンスはシリアルポートから読み込み、そのまま返す
    """
    if len(_payload) > 0:
        serial_write(_ser, _payload)
    else:
        return b''
    
    ret = b''
    _ser_len = 0
    while _ser_len == 0:
        time.sleep(0.1)
        _ser_len = _ser.inWaiting()
    while _ser_len > 0:
        ret = ret + _ser.read(_ser_len)
        _ser_len = _ser.inWaiting()
        _tmp_timtout = 0.0
        while _ser_len == 0:
            _ser_len = _ser.inWaiting()
            _tmp_timtout = _tmp_timtout + 0.1
            # print('## loop', _tmp_timtout, _ser_len)
            time.sleep(0.1)
            if _tmp_timtout > timeout:
                break

    # ret = _ser.read(_ser_len)
    if ret[0:2] != b'\x52\x42':
        raise print("Invalid Header", ret)
    if ret[4] != 1 and ret[4] != 2:
        for i in range(len(ret)):
            print(f'({i}) {ret[i]:02x}', end=' ')
        print()
        raise print("Error Response", hex(ret[4]))
    return ret


def dump_data(_ret):
    for i in range(len(_ret)):
        print(f'({i}) {_ret[i]:02x}', end=' ')
        if i % 16 == 15:
            print()
    print()
    return

In [3]:
def led_off(_ser):
    """
    LEDを消灯する関数
    """
    print('LED OFF')
    _payload = bytearray([0x02, # Read 0x01, Write 0x02
                          0x11, 0x51, # LED設定 (0x5111 をリトルエンディアンで送信)
                          0x00, 0x00, # LED常時消灯 (0x0000 をリトルエンディアンで送信)
                          0x00, 0x00, 0x00]) # 色設定　RGB ここでは赤に設定
    ret = serial_read(_ser, _payload)
    dump_data(ret)
    return

In [4]:
def led_on(_ser):
    """
    LEDを点灯する関数
    """
    print('LED ON')
    _payload = bytearray([0x02, # Read 0x01, Write 0x02
                          0x11, 0x51, # LED設定 (0x5111 をリトルエンディアンで送信)
                          0x01, 0x00, # LEDを点灯 (0x0001 をリトルエンディアンで送信)
                          0xFF, 0xFF, 0xFF]) # 色設定　RGB ここでは白に設定
    ret = serial_read(_ser, _payload)
    dump_data(ret)
    return

In [5]:
def check_logger_status(_ser):
    """
    ロガーの状態を確認する関数
    """
    print('### check_logger_status ###')
    _payload = bytearray([0x01, # Read 0x01, Write 0x02
                         0x19, 0x51]) # Acceleration logger status (Address: 0x5119 をリトルエンディアンで送信)
    ret = serial_read(_ser, _payload)
    dump_data(ret)
    print('Logger status', ret[7], '(0x00: Waiting 0x01: Running')
    print('Running page', f'0x{ret[8]|ret[9]<<8:04x} (Range: 0x0001 to 0x2800 (1 to 10240))')
    return ret

In [6]:
def logging_start(_ser):
    """
    加速度データの記録を開始
    """
    print('### Check mode to Acceleration data logging...')
    _payload = bytearray([0x01, # Read 0x01, Write 0x02
                        0x17, 0x51]) # Mode change (Address: 0x5117 をリトルエンディアンで送信)
    ret = serial_read(_ser, _payload)
    dump_data(ret)
    if ret[7] != 0x01:
        print('### Change mode to Acceleration data logging...')
        payload = bytearray([0x02, # Read 0x01, Write 0x02
                            0x17, 0x51, # Mode change (Address: 0x5117 をリトルエンディアンで送信)
                            0x01]) # 0x00: Normal mode (default) 0x01: Acceleration logger mode
        ret = serial_read(_ser, payload)
        dump_data(ret)
        # time.sleep(60) # logger mode 切り替え時に Flash memory が消去(LED点灯)され、それが終了するまでスリープする
    else:
        print('### Reset Acceleration data area...')
        payload = bytearray([0x02, # Read 0x01, Write 0x02
                            0x16, 0x51, # Memory reset (Address: 0x5116 をリトルエンディアンで送信)
                            0x02]) # 0x01: Sensing data area 0x02: Acceleration area
        ret = serial_read(_ser, payload)
        dump_data(ret)
        # time.sleep(60) # logger mode 切り替え時に Flash memory が消去(LED点灯)され、それが終了するまでスリープする

    print('### Acceleration data logging start... ', end=' ')
    _payload = bytearray([0x02, # Read 0x01, Write 0x02
                        0x18, 0x51, # Acceleration logger control (0x5118 をリトルエンディアンで送信)
                        0x01, # 0x00: Log stop 0x01: Log start
                        0x00, # Range of detection (固定値)
                        0x03, # ODR setting (0x00: 1 Hz 0x02: 25 Hz 0x03: 100 Hz 0x04: 200 Hz 0x05: 400 Hz)
                        0x01, 0x00, # Start page (range 0x0001 to 0x2800 (0x0001 をリトルエンディアンで送信))
                        0x00, 0x28]) # End page (0x2800 をリトルエンディアンで送信)
    ret = serial_read(_ser, _payload)
    _start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
    print(_start_time)
    dump_data(ret)
    return _start_time

In [7]:
def logging_stop(_ser):
    """
    加速度データの記録を終了
    """

    print('### Check mode to Acceleration data logging...')
    ret = check_logger_status(_ser)
    dump_data(ret)

    if ret[7] == 0x01:
        print('### Acceleration data logging stop ###', end=' ')
        _payload = bytearray([0x02, # Read 0x01, Write 0x02
                            0x18, 0x51, # Acceleration logger control (0x5118 をリトルエンディアンで送信)
                            0x00, # 0x00: Log stop 0x01: Log start
                            0x00, # Range of detection (固定値)
                            0x03, # ODR setting (0x00: 1 Hz 0x02: 25 Hz 0x03: 100 Hz 0x04: 200 Hz 0x05: 400 Hz)
                            0x01, 0x00, # Start page (range 0x0001 to 0x2800 (0x0001 をリトルエンディアンで送信))
                            0x00, 0x28]) # End page (0x2800 をリトルエンディアンで送信)
        ret = serial_read(_ser, _payload)
        _stop_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        print(_stop_time)
        dump_data(ret)
    else:
        print('### Acceleration data logging is not running ###')
        _stop_time = ''

    print('### Check mode to Acceleration data logging...')
    ret = check_logger_status(_ser)
    dump_data(ret)

    return _stop_time

In [8]:
def dump_acc_data(_acc_data):
    print(len(_acc_data))
    for i in range(0, len(_acc_data)-1, 6):
        x = s16(_acc_data[i+1] << 8 | _acc_data[i]) * 0.1
        y = s16(_acc_data[i+3] << 8 | _acc_data[i+2]) * 0.1
        z = s16(_acc_data[i+5] << 8 | _acc_data[i+4]) * 0.1
        print(f'{i/6} x={x:.2f} y={y:.2f} z={z:.2f}')
    return

In [10]:
# シリアルポートをオープン
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE, serial.EIGHTBITS, serial.PARITY_NONE, write_timeout=1, timeout=1)
ser.reset_input_buffer()
ser.reset_output_buffer()

In [11]:
payload = bytearray([0x01, # Read 0x01, Write 0x02
                     0x02, 0x52, # Time setting (Address: 0x5202 をリトルエンディアンで送信)
                     ])
ret = serial_read(ser, payload)
current_timecounter  = int.from_bytes(ret[7:15], 'little')
print('current_timecounter (r)', current_timecounter)
if current_timecounter == 0:
    payload = bytearray([0x02, # Read 0x01, Write 0x02
                        0x02, 0x52 # Time setting (Address: 0x5202 をリトルエンディアンで送信)
                        ]) + pack('<Q', 1) # UInt64 で 1 送信
    ret = serial_read(ser, payload)
    current_timecounter  = int.from_bytes(ret[7:15], 'little')
    print('current_timecounter (w)', current_timecounter)

payload = bytearray([0x01, # Read 0x01, Write 0x02
                     0x03, 0x52, # Memory storage interval (Address: 0x5203 をリトルエンディアンで送信)
                     ])
ret = serial_read(ser, payload)
storage_interval  = int.from_bytes(ret[7:9], 'little')
print('storage_interval (r)', storage_interval)
if storage_interval < 3600:
    payload = bytearray([0x02, # Read 0x01, Write 0x02
                        0x03, 0x52 # Memory storage interval (Address: 0x5203 をリトルエンディアンで送信)
                        ]) + pack('<H', 300) # UInt16 で 3600 送信
    ret = serial_read(ser, payload)
    storage_interval  = int.from_bytes(ret[7:9], 'little')
    print('storage_interval (w)', storage_interval)

current_timecounter (r) 0
current_timecounter (w) 1
storage_interval (r) 3600


In [12]:
ret = check_logger_status(ser)

### check_logger_status ###
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 00 (8) 00 (9) 00 (10) f7 (11) 66 
Logger status 0 (0x00: Waiting 0x01: Running
Running page 0x0000 (Range: 0x0001 to 0x2800 (1 to 10240))


In [14]:
_start_time = logging_start(ser)

### Check mode to Acceleration data logging...
(0) 52 (1) 42 (2) 06 (3) 00 (4) 01 (5) 17 (6) 51 (7) 01 (8) eb (9) 24 
### Reset Acceleration data area...
(0) 52 (1) 42 (2) 06 (3) 00 (4) 82 (5) 16 (6) 51 (7) 05 (8) 92 (9) a3 
Error Response 0x82


TypeError: exceptions must derive from BaseException

In [15]:
ret = check_logger_status(ser)

### check_logger_status ###
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 01 (8) 7a (9) 00 (10) 85 (11) c6 
Logger status 1 (0x00: Waiting 0x01: Running
Running page 0x007a (Range: 0x0001 to 0x2800 (1 to 10240))


In [16]:
_stop_time = logging_stop(ser)

### Check mode to Acceleration data logging...
### check_logger_status ###
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 01 (8) ad (9) 00 (10) da (11) 36 
Logger status 1 (0x00: Waiting 0x01: Running
Running page 0x00ad (Range: 0x0001 to 0x2800 (1 to 10240))
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 01 (8) ad (9) 00 (10) da (11) 36 
### Acceleration data logging stop ### 2024-04-18 17:51:08.331113
(0) 52 (1) 42 (2) 0c (3) 00 (4) 02 (5) 18 (6) 51 (7) 00 (8) 00 (9) 03 (10) 01 (11) 00 (12) 00 (13) 28 (14) 9b (15) d9 

### Check mode to Acceleration data logging...
### check_logger_status ###
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 00 (8) ae (9) 00 (10) 8b (11) 06 
Logger status 0 (0x00: Waiting 0x01: Running
Running page 0x00ae (Range: 0x0001 to 0x2800 (1 to 10240))
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 00 (8) ae (9) 00 (10) 8b (11) 06 


In [17]:
ret = check_logger_status(ser)

### check_logger_status ###
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 00 (8) ae (9) 00 (10) 8b (11) 06 
Logger status 0 (0x00: Waiting 0x01: Running
Running page 0x00ae (Range: 0x0001 to 0x2800 (1 to 10240))


In [18]:
ret = check_logger_status(ser)
_start_page = 0x0001
_end_page = ret[8] | ret[9]<<8
_payload = bytearray([0x01, # Read 0x01, Write 0x02
                      0x3F, 0x50, # Acceleration memory data [Data] (Address: 0x503F)
                      0x02, # Acceleration data type UInt8 0x02: Logger data
                      0x01, # Request acceleration memory index UInt8 0x01: Fixed value
                      0x01, 0x00, # Request page (Start page)
                      ret[8], ret[9]]) # Request page (End page)
ret = serial_read(ser, _payload, timeout=1.0)
for i in range(_end_page):
    print(f'### page 0x{i+1:04x}')
    dump_data(ret[i*237:(i+1)*237])
    dump_acc_data(ret[i*237+43:(i+1)*237-2])

### check_logger_status ###
(0) 52 (1) 42 (2) 08 (3) 00 (4) 01 (5) 19 (6) 51 (7) 00 (8) ae (9) 00 (10) 8b (11) 06 
Logger status 0 (0x00: Waiting 0x01: Running
Running page 0x00ae (Range: 0x0001 to 0x2800 (1 to 10240))
### page 0x0001
(0) 52 (1) 42 (2) e9 (3) 00 (4) 01 (5) 3f (6) 50 (7) 01 (8) 00 (9) 00 (10) 00 (11) 00 (12) 00 (13) 00 (14) 00 (15) 00 
(16) 00 (17) 00 (18) 00 (19) 00 (20) 00 (21) b7 (22) 08 (23) 04 (24) 17 (25) 69 (26) 00 (27) 62 (28) 4a (29) 0f (30) 00 (31) b5 
(32) 16 (33) 19 (34) 00 (35) 35 (36) 02 (37) ef (38) 1a (39) de (40) 07 (41) ff (42) ff (43) 4a (44) 00 (45) 63 (46) d9 (47) 9a 
(48) fb (49) 4a (50) 00 (51) 63 (52) d9 (53) bf (54) fb (55) 4a (56) 00 (57) 63 (58) d9 (59) e5 (60) fb (61) 4a (62) 00 (63) 89 
(64) d9 (65) e5 (66) fb (67) 4a (68) 00 (69) 89 (70) d9 (71) e5 (72) fb (73) 4a (74) 00 (75) 89 (76) d9 (77) bf (78) fb (79) 4a 
(80) 00 (81) 63 (82) d9 (83) e5 (84) fb (85) 70 (86) 00 (87) 63 (88) d9 (89) e5 (90) fb (91) 25 (92) 00 (93) 63 (94) d9 (95) e5 
(

In [19]:
print('### Check mode to Acceleration data logging...')
_payload = bytearray([0x01, # Read 0x01, Write 0x02
                      0x17, 0x51]) # Mode change (Address: 0x5117 をリトルエンディアンで送信)
ret = serial_read(ser, _payload)
dump_data(ret)
if ret[7] == 0x01:
    print('### Change mode to Acceleration data logging...')
    payload = bytearray([0x02, # Read 0x01, Write 0x02
                        0x17, 0x51, # Mode change (Address: 0x5117 をリトルエンディアンで送信)
                        0x00]) # 0x00: Normal mode (default) 0x01: Acceleration logger mode
    ret = serial_read(ser, payload)
    dump_data(ret)

print('serial port close')
ser.close()

### Check mode to Acceleration data logging...
(0) 52 (1) 42 (2) 06 (3) 00 (4) 01 (5) 17 (6) 51 (7) 01 (8) eb (9) 24 
### Change mode to Acceleration data logging...
(0) 52 (1) 42 (2) 06 (3) 00 (4) 02 (5) 17 (6) 51 (7) 00 (8) 2a (9) a0 
serial port close
