In [394]:
%serialconnect --port=COM6

[34mConnecting to --port=COM6 --baud=115200 [0m
[34mReady.
[0m

In [395]:
import machine
from machine import Pin, I2C, UART
import utime
import struct
import math

In [396]:
pin_mappings = {
    "D0": 16,
    "D1": 5,
    "D2": 4,
    "D3": 0,
    "D4": 2,
    "D5": 14,
    "D6": 12,
    "D7": 13,
    "D8": 15,
    "RX": 3,
    "TX": 1,
    "SD3": 10,
    "SD2": 9
}

In [397]:
sclk_pin = Pin(pin_mappings["D1"])
sdata_pin = Pin(pin_mappings["D2"])
i2c = I2C(-1, sclk_pin, sdata_pin, freq=400000)

In [398]:
print(hex(i2c.scan()[0]))

0x68


In [399]:
# Two addressing modes
# Write to internal register
# Read from register

# Single-byte write: 
# S, AD+W,    , RA,    , DATA,    , P
#  ,     , ACK,     ACK,     , ACK, 

# Burst write sequence: 
# S, AD+W,    , RA,    , DATA,    , DATA,    , P (Auto increment to next register)
#  ,     , ACK,   , ACK,     , ACK,     , ACK,

# Single-byte read: 
# S, AD+W,    , RA,    , S, AD+R,    ,     , NACK, P
#  ,     , ACK,   , ACK,  ,     , ACK, DATA,     ,  



# Burst read sequence: 
# S, AD+W,    , RA,    , S, AD+R,    ,     , ACK,     , NACK, P
#  ,     , ACK,   , ACK,  ,     , ACK, DATA,    , DATA, 

In [400]:
class MPU6050_Bus:
    def __init__(self, i2c):
        self.i2c = i2c
        self.addr = 0x68
        self.write_addr = self.addr << 1
        self.read_addr = (self.addr << 1) | (1 << 0)
        
        self.total_retries = 5
        self.retry_delay_ms = 100
    
    def write_bytes(self, register_addr, data):
        data = bytes(data)
        for _ in range(self.total_retries):
            try:
                self.i2c.start()
                self.assert_ack(self.i2c.write(bytes([self.write_addr, register_addr])), 2)
                self.assert_ack(self.i2c.write(data), len(data))
                self.i2c.stop()
                return
            except CommError as ex:
                utime.sleep_ms(self.retry_delay_ms)
                continue
        else:
            utime.sleep_ms(self.retry_delay_ms)
            raise Exception("Maximum retries reached")
        
    def read_bytes (self, register_addr, total_bytes=1):
        buffer = bytearray(total_bytes)
        
        for _ in range(self.total_retries):
            try:
                self.i2c.start()
                self.assert_ack(self.i2c.write(bytes([self.write_addr, register_addr])), 2)
                self.i2c.start()
                self.assert_ack(self.i2c.write(bytes([self.read_addr])))
                self.i2c.readinto(buffer)
                self.i2c.stop()
                return buffer
            except CommError as ex:
                utime.sleep_ms(self.retry_delay_ms)
                continue
        else:
            raise Exception("Reached maximum retries")
    
    def write_byte(self, register_addr, value):
        value = value & 0xFF
        self.write_bytes(register_addr, [value])
    
    def read_byte(self, register_addr):
        return self.read_bytes(register_addr, 1)[0]
    
    def assert_ack(self, n, expected=1):
        if n != expected:
            raise CommError("Failed acknowledge")

class CommError(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

In [401]:
class MPU6050:
    def __init__(self, bus):
        self.bus = bus
        
        self.gyro_fullscale_range = 0
        self.accel_fullscale_range = 0
        
        self.gyro_sensitivity = [131, 65.5, 32.8, 16.4]
        self.accel_sensitivity = [16384, 8192, 4096, 2048]
        
        self.set_clock_source(0x00)
        self.set_gyro_fullscale_range(0)
        self.set_accel_fullscale_range(0)
        self.set_sleep(False)
        
        
    def set_clock_source(self, source):
        data = self.bus.read_byte(0x6B)
        data = (data & 0xf8) | (source & 0x07)
        self.bus.write_byte(0x6B, data)
    
    # page 14
    def set_gyro_fullscale_range(self, mode):
        # 0 = 250 deg/s
        # 1 = 500
        # 2 = 1000
        # 3 = 2000
        mode = mode & 0x03
        data = self.bus.read_byte(0x1b)
        data = (data & int("11100111", 2)) | (mode << 3)
        self.bus.write_byte(0x1b, data)
        self.gyro_fullscale_range = mode
        
    # page 14
    def set_accel_fullscale_range(self, mode):
        # 0 = 2g
        # 1 = 4g
        # 2 = 8g
        # 3 = 16g
        mode = mode & 0x03
        data = self.bus.read_byte(0x1c)
        data = (data & int("11100111", 2)) | (mode << 3)
        self.bus.write_byte(0x1c, data)
        self.accel_fullscale_range = mode
            
    def set_sleep(self, is_sleep):
        data = self.bus.read_byte(0x6B)
        if not is_sleep:
            self.bus.write_byte(0x6B, data & ~(1 << 6))
        else:
            self.bus.write_byte(0x6B, data | (1 << 6))
            
    def is_ready(self):
        data = self.bus.read_byte(0x3a)
        if data & (1 << 0):
             return True
        return False
            
    def get_acceleration_data(self):
        if not self.is_ready():
            raise NotReady("Device is not ready!")
        
        data = self.bus.read_bytes(0x3B, 14)
        acc_x, acc_y, acc_z, temperature, gyro_x, gyro_y, gyro_z = struct.unpack(">hhhhhhh", data) 
        
        accel_shift = self.accel_sensitivity[self.accel_fullscale_range]
        acc_x /= accel_shift
        acc_y /= accel_shift
        acc_z /= accel_shift
        
        gyro_divider = self.gyro_sensitivity[self.gyro_fullscale_range]
        gyro_x /= gyro_divider
        gyro_y /= gyro_divider
        gyro_z /= gyro_divider
        
        return ((acc_x, acc_y, acc_z), (gyro_x, gyro_y, gyro_z))
    
class NotReady(Exception):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

In [402]:
class AccelerationCalculator:
    def __init__(self, sensor):
        self.sensor = sensor
        
        self.gyro_errors = [0, 0, 0]
        self.accel_errors = [0, 0, 0] 
        
        self.gyro_angles = [0, 0, 0]
        self.euler_angles = [0, 0, 0]
        
        self.last_read_time_ms = None
        
    def calibrate(self, euler_angles=[0, 0, 0], gyro_reference=[0, 0, 0], accel_reference=[0, 0, 9.8], total_samples=1000):
        
        self.euler_angles = euler_angles
        self.gyro_angles = euler_angles
        
        self.accel_angles = [0, 0] # x and y
        
        accel_values = [0, 0, 0]
        gyro_values = [0, 0, 0]
        
        for _ in range(total_samples):
            accel, gyro = self.sensor.get_acceleration_data()
            for axis in range(3):
                accel_values[axis] += accel[axis]
                gyro_values[axis] += gyro[axis]
        
        # average offsets
        for axis in range(3):
            accel_values[axis] /= total_samples
            gyro_values[axis] /= total_samples
        
        print((accel_values, gyro_values))
        
        for axis in range(3):
            # later we do reading -= error
            # reading = measured - error
            # error = measured - reading
            
            # in this instance, measured = raw, reading = target
            # error = raw - target
            self.accel_errors[axis] = accel_values[axis] - accel_reference[axis]
            self.gyro_errors[axis] = gyro_values[axis] - gyro_reference[axis]
            
    def calculate_acceleration_angles(self, accel):
        x = (math.atan(accel[1]) / math.sqrt(accel[0]**2 + accel[2]**2)) * 180/math.pi
        y = (math.atan(-accel[0]) / math.sqrt(accel[1]**2 + accel[2]**2)) * 180/math.pi
        return (x, y)
    
    # calibrated
    def get_acceleration_data(self):
        accel_values, gyro_values = self.sensor.get_acceleration_data()
        
        accel_values = [accel_values[i]-self.accel_errors[i] for i in range(3)]
        gyro_values = [gyro_values[i]-self.gyro_errors[i] for i in range(3)]
            
        return (accel_values, gyro_values)
    
    def get_euler_angles(self):
        accel_values, gyro_values = self.get_acceleration_data()
        
        current_time_ms = utime.ticks_ms()
        
        if self.last_read_time_ms is None:
            self.last_read_time_ms = current_time_ms
        
        delta_time_ms = current_time_ms - self.last_read_time_ms
        self.last_read_time_ms = current_time_ms
        
        delta_time_s = delta_time_ms / 1000
        
        accel_angle_x, accel_angle_y = self.calculate_acceleration_angles(accel_values)
        
        # given as roll, pitch, yaw
        for i in range(3):
            self.gyro_angles[i] += gyro_values[i] * delta_time_s
        
        self.euler_angles[0] += gyro_values[2] * delta_time_s # yaw
        self.euler_angles[1] = 0.96 * self.gyro_angles[0] + 0.04 * accel_angle_x # roll
        self.euler_angles[2] = 0.96 * self.gyro_angles[1] + 0.04 * accel_angle_y # pitch
        
        return self.euler_angles

In [403]:
def hex_string(data):
    return ' '.join(('0x{:02x}'.format(d) for d in data))

In [404]:
bus = MPU6050_Bus(i2c) 
sensor = MPU6050(bus)

In [405]:
calculator = AccelerationCalculator(sensor)

In [406]:
sensor.set_accel_fullscale_range(3) # measure up to 16g
sensor.set_gyro_fullscale_range(3) # 2000 deg/s

In [407]:
calculator.calibrate(accel_reference=[1, 0, 0])
print(calculator.gyro_errors)
print(calculator.accel_errors)

([1.00628, -0.0150806, -0.00312793], [-3.99322, 0.800728, 2.03073])
[-3.99322, 0.800728, 2.03073]
[0.00627804, -0.0150806, -0.00312793]


In [408]:
bluetooth = UART(1, 115200)
bluetooth.init(115200, bits=8, parity=None, stop=1)

In [409]:
header = 0xba41
while True:
    try:
        accel, gyro = calculator.get_acceleration_data()
        read_time = utime.ticks_ms()
        data = struct.pack(">HLffffff", header, read_time, accel[0], accel[1], accel[2], gyro[0], gyro[1], gyro[2])
        bluetooth.write(data)
        utime.sleep_ms(1)
    except NotReady as ex:
        print(ex)
        utime.sleep_ms(10)

............................................................................................................................................................................................................................................................................................................................................
**[ys] <class 'serial.serialutil.SerialException'>
**[ys] GetOverlappedResult failed (PermissionError(13, 'Access is denied.', None, 5))


**[ys] <class 'serial.serialutil.SerialException'>
**[ys] ClearCommError failed (PermissionError(13, 'Access is denied.', None, 5))

