## Simple Measurement System

In [19]:
import adafruit_bno055
import busio
import board
import threading
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
path = os.getcwd()
from IPython.display import display, clear_output
from collections import deque
from time import perf_counter
import ipywidgets as widgets
import datetime


In [20]:
class mesurement:
    
    def __init__(self, BNO_UPDATE_FREQUENCY_HZ=10, INTERVAL=1000):
        
        self.COLUMNS = ["Time","euler_x", "euler_y", "euler_z", "gyro_x", "gyro_y", "gyro_z", "gravity_x", "gravity_y", "gravity_z",
                        "linear_accel_x", "linear_accel_y", "linear_accel_z","accel_x", "accel_y", "accel_z",
                        "quaternion_1", "quaternion_2", "quaternion_3", "quaternion_4", 
                        "calibstat_sys", "calibstat_gyro", "calibstat_accel", "calibstat_mag"]
    
        self.BNO_UPDATE_FREQUENCY_HZ = BNO_UPDATE_FREQUENCY_HZ
        self.exepath = path + '/measurement_system'
        self.datapath = path + '/data'
    
    
        self.INTERVAL = INTERVAL
        self.INIT_LEN = self.INTERVAL // self.BNO_UPDATE_FREQUENCY_HZ

        self.Time_data = deque(np.zeros(self.INIT_LEN))# Time
        self.linear_accel_x_data = deque(np.zeros(self.INIT_LEN))# linear_accel_x
        self.linear_accel_y_data = deque(np.zeros(self.INIT_LEN))# linear_accel_y
        self.linear_accel_z_data = deque(np.zeros(self.INIT_LEN))# linear_accel_z
        self.gyro_x_data = deque(np.zeros(self.INIT_LEN))# gyro_x
        self.gyro_y_data = deque(np.zeros(self.INIT_LEN))# gyro_y
        self.gyro_z_data = deque(np.zeros(self.INIT_LEN))# gyro_z
        
        self.assy_data = None
        
        self.i2c, self.bno = self.connect()

            
    def connect(self):
        i2c = busio.I2C(board.SCL, board.SDA)# Access i2c using SCL and SDA
        bno = adafruit_bno055.BNO055_I2C(i2c)# Connect BNO055. Default: NDOF_MODE(12)
        #self.bno.use_external_crystal = True
        print("Established connection with BNO055")
        return i2c, bno
    
    def get_data(self, bno):
        '''
        Get data from new value from BNO055
        
        '''    
        euler_z, euler_x, euler_y = [val for val in bno.euler]# Euler angle[deg] (Yaw, Roll, Pitch)
        euler_z = euler_z - 180# 180deg offset
        gyro_x, gyro_y, gyro_z = [val for val in bno.gyro]# Gyro[rad/s]
        gravity_x, gravity_y, gravity_z = [val for val in bno.gravity]# Gravitaional aceleration[m/s^2]
        linear_accel_x, linear_accel_y, linear_accel_z = [val for val in bno.linear_acceleration]# Linear acceleration[m/s^2]
        accel_x, accel_y, accel_z = [val for val in bno.acceleration]# Three axis of acceleration(Gravity + Linear motion)[m/s^2]
        quaternion_1, quaternion_2, quaternion_3, quaternion_4 = [val for val in bno.quaternion]# Quaternion
        calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag = [val for val in bno.calibration_status]# Status of calibration
            
        return euler_x, euler_y, euler_z, gyro_x, gyro_y, gyro_z, gravity_x, gravity_y, gravity_z,\
                linear_accel_x, linear_accel_y, linear_accel_z, accel_x, accel_y, accel_z,\
                quaternion_1, quaternion_2, quaternion_3, quaternion_4,\
                calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag
        
    

In [21]:
def wait_process(wait_sec):
    until = perf_counter() + wait_sec
    while perf_counter() < until:
        pass
    return

In [15]:


class measurement_control:
    def __init__(self, BNO_UPDATE_FREQUENCY_HZ = 10, Isautosave = True):
        self.ctrlbutton_state = 'stop'# stop/start
        self.ctrl_button = widgets.Button(description="▶",layout=lay(120,50)) 
        self.save_button = widgets.Button(description="Save",layout=lay(120,50))
        self.calib_button = widgets.Button(description="Calib",layout=lay(120,50))
        
        self.output = widgets.Output(layour={'border': '1px solid black'})       
        self.measurement = mesurement(BNO_UPDATE_FREQUENCY_HZ=BNO_UPDATE_FREQUENCY_HZ)
        self.Isstart = False
        self.start_thread = None
        self.stop_thread = None
        self.save_thread = None
        self.buttonState = "stop"
        self.elapsed_time = 0
        self.itr_end_time = 0
        self.logic_end_time = 0
        self.logic_start_time = 0
        self.itr_start_time = 0
        self.Isautosave = Isautosave
        
    def start(self):
        
        self.Isstart = True
        self.stop_thread = None
        #print('Measurement Start')
        self.ctrl_button.description = '■'
        self.ctrl_button.button_style = 'success'
        
        
        #Measurement main logic
        try:
            self.logic_start_time = time.time()#Logic start time
            counter = 0# Clock

            
            while self.Isstart:
                self.itr_start_time = time.time()# Start time of iteration loop
                Time = counter / self.measurement.BNO_UPDATE_FREQUENCY_HZ# Current time
                
                #Time, euler_x, euler_y, euler_z, gyro_x, gyro_y, gyro_z, gravity_x, gravity_y, gravity_z,\
                #linear_accel_x, linear_accel_y, linear_accel_z, accel_x, accel_y, accel_z,\
                #quaternion_1, quaternion_2, quaternion_3, quaternion_4,\
                #calibstat_sys, calibstat_gyro, calibstat_accel, calibstat_mag]
                                 
                data = np.array([Time] + list(self.measurement.get_data(self.measurement.bno))).reshape(-1, len(self.measurement.COLUMNS))# Get data from sensor and merge time
                
                            
                if counter == 0:
                    
                    self.measurement.assy_data = data.copy()

                else: 
                    self.measurement.assy_data = np.concatenate((self.measurement.assy_data, data), axis = 0)# Concatenate data
        
                self.itr_end_time = time.time()# End time of iteration loop
   
                wait_process((1.0 / self.measurement.BNO_UPDATE_FREQUENCY_HZ) - (self.itr_end_time - self.itr_start_time))# For keeping sampling frequency
                counter += 1

            self.logic_end_time = time.time()
            # Elapsed time
            self.elapsed_time = self.logic_end_time - self.logic_start_time         
            
                            
        except Exception as e:
            self.logic_end_time = time.time()
            # Elapsed time
            self.elapsed_time = self.logic_end_time - self.logic_start_time
            print('Error! Save data')
            self.save()
            return

        
    def stop(self):
        self.Isstart = False
        self.start_thread = None
        try:
            if meas_ctrl.Isautosave:
                self.save()
            self.ctrl_button.description = '▶'
            self.ctrl_button.button_style = ''
        except Exception as e:
            print('Error!')
        
        return

    def save(self):
        self.save_button.description = 'Saving...'
        self.save_button.button_style = 'success'
        self.measurement.assy_data = pd.DataFrame(self.measurement.assy_data, columns=list(self.measurement.COLUMNS))
        
        t_delta = datetime.timedelta(hours=9)
        JST = datetime.timezone(t_delta, 'JST')# You have to set your timezone
        now = datetime.datetime.now(JST)
        timestamp = now.strftime('%Y%m%d%H%M%S')
        self.measurement.assy_data.to_csv(self.measurement.datapath + '/'+ timestamp +'_data.csv')
        
        wait_process(0.5)# For user friendly
        
        self.save_button.description = 'Save'
        self.save_button.button_style = ''
        self.save_thread = None


    def get_calib_stat(self):
        print("sys, gyro, accel, mag: {0}".format(self.measurement.get_data(self.measurement.bno)[19:]))



    def Run(self):
        @self.output.capture()
        def on_click_ctrlbutton_callback(clicked_button: widgets.Button) -> None:
            if self.ctrlbutton_state == 'stop':
                if self.start_thread == None :
                    self.ctrlbutton_state = 'start'
                    self.start_thread = threading.Thread(target=self.start)
                    self.start_thread.start()
                
            elif self.ctrlbutton_state == 'start':
                if self.stop_thread == None :
                    self.ctrlbutton_state = 'stop'
                    self.stop_thread = threading.Thread(target=self.stop)
                    self.stop_thread.start()
                    
        def on_click_savebutton_callback(clicked_button: widgets.Button) -> None:
            if self.ctrlbutton_state == 'stop' and self.Isstart == False:
                self.save_thread = threading.Thread(target=self.save)
                self.save_thread.start()
        
        @self.output.capture()
        def on_click_calibbutton_callback(clicked_button: widgets.Button) -> None:
            self.get_calib_stat()

        self.calib_button.on_click(on_click_calibbutton_callback)
        self.ctrl_button.on_click(on_click_ctrlbutton_callback)
        self.save_button.on_click(on_click_savebutton_callback)

        
def lay(width,height):
    return widgets.Layout(width=str(width)+"px",height=str(height)+"px")
def init_meas_system():
    meas_ctrl = measurement_control(BNO_UPDATE_FREQUENCY_HZ=1, Isautosave=False)
    display(widgets.VBox([
            widgets.HBox([meas_ctrl.ctrl_button, meas_ctrl.save_button, meas_ctrl.calib_button]), 
            meas_ctrl.output]))


    return meas_ctrl
def main_loop():
    meas_ctrl = init_meas_system()
    meas_ctrl.Run()
    return meas_ctrl


In [16]:
meas_ctrl = main_loop()

Established connection with BNO055


VBox(children=(HBox(children=(Button(description='▶', layout=Layout(height='50px', width='120px'), style=Butto…

In [17]:
#meas_ctrl.elapsed_time
meas_ctrl.logic_end_time - meas_ctrl.logic_start_time - (1.0 / meas_ctrl.measurement.BNO_UPDATE_FREQUENCY_HZ)


10.000317573547363