In [1]:
from imu_bno import Imu

from adafruit_bno08x import BNO_REPORT_ACCELEROMETER, BNO_REPORT_GYROSCOPE, BNO_REPORT_MAGNETOMETER, BNO_REPORT_GRAVITY, BNO_REPORT_ROTATION_VECTOR, BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR
import busio

from multiprocessing import Process
from multiprocessing import Event as PEvent
from multiprocessing import Queue as PQueue

import multiprocessing as mp
import queue

import time

import cv2
from IPython.display import display, Image

```
Neural_EKF(
  (rnn): LSTM(9, 240, batch_first=True)
  (linear): Linear(in_features=240, out_features=3, bias=True)
)
```

In [2]:
import numpy as np
from matplotlib import pyplot as plt
# import pandas as pd
import torch
import torch.nn as nn

class Neural_EKF(nn.Module):
  def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
    self.input_size = 9
    self.hidden_size_rnn = 240
    self.hidden_size_lin = self.hidden_size_rnn#*20
    self.output_size = 3
    self.n_layers = 1
    # Defining the RNN layer
    self.rnn = nn.LSTM(self.input_size, self.hidden_size_rnn, self.n_layers, batch_first=True)
    # Defining the linear layer
    self.linear = nn.Linear(self.hidden_size_lin, self.output_size)
    self.preLinear = nn.Linear(self.output_size, self.hidden_size_lin)
    self.loss_val = None

  def forward(self, input, target=None, device = 'cpu'):
    req_grad = True
    if (target is None):
      req_grad = False

    self.requires_grad_(req_grad)

    pre, seq = input

    h0 = torch.zeros(self.n_layers, seq.size(0), self.hidden_size_rnn, requires_grad=req_grad).float().to(device)
    c0 = nn.functional.relu(self.preLinear(pre)).view(self.n_layers, seq.size(0), self.hidden_size_rnn)
    out, hn = self.rnn(seq, (h0, c0))

    out = nn.functional.relu(out[:, -1])
    out = self.linear(out)
    return out


In [3]:
def read_process(dev, que):
    while True:
        try:
            accX, accY, accZ = dev.acceleration
            gyroX, gyroY, gyroZ = dev.gyro
            magnX, magnY, magnZ = dev.magnetic
    
            dat_r = [[accX, accY, accZ], [gyroX, gyroY, gyroZ], [magnX, magnY, magnZ]]
            # dat =  self.get_ekf_gravity(dat_r)
            # gX, gY, gZ = self.sensor.get_ekf_gravity(dat_r)
            que.put([accX, accY, accZ, gyroX, gyroY, gyroZ,magnX, magnY, magnZ])
            if que.full():
                que.get()
        except KeyboardInterrupt:
            pass

In [4]:
# model_len60 = Neural_EKF()
model_len60 = torch.load("LSTM60_direct.pt")
model_len60.eval()
i2c = busio.I2C((1, 14), (1, 15))
dev = Imu(i2c, address=0x4b)
dev.enable_feature(BNO_REPORT_GRAVITY)

In [5]:
def read_data_60hz(que, last_ts):
    item = None
    while que.empty():
        pass
    item =  que.get()
    end = time.time()
    #60Hz
    while ((end - last_ts) < 0.0166666667):
        end = time.time()
        pass
    last_ts = end
    # item.insert(0, (end + self.last_ts)/2)
    # self.last_ts = end
    # data.append(item)
    return item, last_ts


def get_neural_grav60(dev, que, gue_l):
    last_ts = time.time()
    # grav = np.array([0, 0, -9.8])
    while True:
        try:
            data = []
            for i in range(60):
                item, last_ts = read_data_60hz(que, last_ts)
                data.append(np.array(item))
            data = np.array(data)
            # print(data)
            gue_l.put(data)
            if gue_l.full():
                gue_l.get()
            # grav = grav + model_len60(torch.unsqueeze(torch.Tensor(data), 0)).squeeze(0).numpy()
            # print(grav)
        except KeyboardInterrupt:
            pass

def get_gravity(dev, que):
    while True:
        try:
            que.put(dev.get_ekf_gravity())
        except KeyboardInterrupt:
            pass

que = PQueue(3)
reader = Process(target=read_process, args=(dev, que,), daemon=True)
reader.start()

que_l = PQueue(3)
reader_l = Process(target=get_neural_grav60, args=(dev, que, que_l,), daemon=True)
reader_l.start()

que_g = PQueue(3)
r_g = Process(target=get_gravity, args=(dev, que_g,), daemon=True)
r_g.start()


		********** Packet *************
DBG::		 HEADER:
DBG::		 Data Len: 25
DBG::		 Channel: INPUT_SENSOR_REPORTS (3)
DBG::		 	Report Type: BASE_TIMESTAMP (0xfb)
DBG::		 	Sensor Report Type: GRAVITY(0x6)
DBG::		 Sequence number: 60

DBG::		 Data:
DBG::		[0x04] 0xFB 0x6D 0x00 0x00 
DBG::		[0x08] 0x00 0x06 0x89 0x03 
DBG::		[0x0C] 0x00 0x12 0x00 0x6A 
DBG::		[0x10] 0x00 0x9C 0x09 0x00 
DBG::		[0x14] 0x00 0x00 0x00 0x00 
DBG::		[0x18] 0x00 0x00 0x00 0x00 
DBG::		[0x1C] 0x00 
		*******************************



Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_288353/3612984087.py", line 40, in get_gravity
    que.put(dev.get_ekf_gravity())
  File "/root/Desktop/python/IMU-data-processing/imu_bno.py", line 206, in get_ekf_gravity
    r, p, y = self.get_ekf_angles(dat)
  File "/root/Desktop/python/IMU-data-processing/imu_bno.py", line 124, in get_ekf_angles
    gX, gY, gZ = self.gyro
  File "/root/Desktop/python/venv/lib/python3.9/site-packages/adafruit_circuitpython_bno08x-1.2.5-py3.9.egg/adafruit_bno08x/__init__.py", line 618, in gyro
    self._process_available_packets()
  File "/root/Desktop/python/venv/lib/python3.9/site-packages/adafruit_circuitpython_bno08x-1.2.5-py3.9.egg/adafruit_bno08x/__init__.py", line 803, in _process_available_packets
    self

In [6]:

display_handle=display(None, display_id=True)
last_ts = time.time()
try:   
    X = 400
    Y = 400
    G = 9.80665
    grav = np.array([0, 0, -G])
    while True:
        
        # g1b, g2b, g3b = que_g.get()#dev.gravity
        g1a, g2a, g3a = grav

        try:
            # dat = que_l.get(block=True, timeout=0.1)
            dat = []
            for i in range(60):
                item, last_ts = read_data_60hz(que, last_ts)
                dat.append(item)
            # grav = grav + model_len60(torch.unsqueeze(torch.Tensor(dat), 0)).squeeze(0).numpy()
            grav = model_len60([torch.unsqueeze(torch.Tensor(grav), 0), torch.unsqueeze(torch.Tensor(dat), 0)]).squeeze(0).numpy()
        except queue.Empty:
            pass

        # xb = int(((g1b + G) / (2 * G)) * X) - 1
        # yb = int(((-g2b + G) / (2 * G)) * Y) - 1

        xa = int(((g1a + G) / (2 * G)) * X) - 1
        ya = int(((-g2a + G) / (2 * G)) * Y) - 1
        
        image = np.ones((X, Y), dtype = np.uint8)
        image.fill(0)
        # for y_ in range(yb-5, yb+5):
        #     image[y_][xb-5:xb+5] = 255
        for y_ in range(ya-5, ya+5):
            image[y_][xa-5:xa+5] = 255
        image = cv2.cvtColor(image,cv2.COLOR_GRAY2RGB)
        # image = cv2.flip(image, 1)
        # print(image.shape, type(image))
        _, image = cv2.imencode('.jpeg', image)
        display_handle.update(Image(data=image.tobytes()))
        
except KeyboardInterrupt:
    display_handle.update(None)
    pass

None

In [None]:
que_l = PQueue(3)
reader_l = Process(target=get_neural_grav60, args=(dev, que, que_l,), daemon=True)
reader_l.start()
grav = np.array([0, 0, -9.8])
for j in range(20):
    # data = []
    # for i in range(60):
    #     item, last_ts = read_data_60hz(que, last_ts)
    #     data.append(np.array(item))
    # data = np.array(data)
    # # print(data)
    data = que_l.get()
    grav = grav + model_len60(torch.unsqueeze(torch.Tensor(data), 0)).squeeze(0).numpy()
    print(grav)

# Linear

In [6]:
class TrajRegression(nn.Module):
  def __init__(self, *args, **kwargs) -> None:
    super().__init__(*args, **kwargs)
#     self.input_layer = nn.Linear(12, 9)
#     self.hidden = nn.Linear(9, 7)
#     self.hidden2 = nn.Linear(7, 5)
#     self.out_layer = nn.Linear(5, 3)
    self.input_layer = nn.Linear(12, 32)
    self.hidden_1 = nn.Linear(32, 16)
    self.hidden_2 = nn.Linear(16, 8)
    self.out_layer = nn.Linear(8, 3)

    self.lrelu = nn.LeakyReLU()
    self.loss_val = None

  def forward(self, input, target=None):
    if (target is None):
      self.requires_grad_(False)
    else:
      self.requires_grad_(True)

#     x = self.lrelu(self.input_layer(input))
#     x = self.lrelu(self.hidden(x))
#     x = self.lrelu(self.hidden2(x))
    x = self.lrelu(self.input_layer(input))
    x = self.lrelu(self.hidden_1(x))
    x = self.lrelu(self.hidden_2(x))
    output = self.out_layer(x)

    return output

In [7]:
model = torch.load("LIN.pt")
model.eval()
grav = [0, 0, -9.80665]
# last_ts = time.time()
# for i in range(100):
#     item, last_ts = read_data_60hz(que, last_ts)
#     item.append(grav[0])
#     item.append(grav[1])
#     item.append(grav[2])
#     grav = model(torch.unsqueeze(torch.Tensor(item), 0)).squeeze(0).numpy()
#     print(grav)

In [8]:

display_handle=display(None, display_id=True)
try:   
    X = 400
    Y = 400
    G = 9.80665
    grav = np.array([0, 0, -G])
    grav_60 = np.array([0, 0, -G])
    dat = []
    last_ts = time.time()
    while True:
        
        # g1b, g2b, g3b = que_g.get()#dev.gravity
       
        g1a, g2a, g3a = grav
        
        item, last_ts = read_data_60hz(que, last_ts)
        dat.append(item)
        item = np.array((list(grav) + list(item)))
        
        # item.insert(grav[0])
        # item.append(grav[1])
        # item.append(grav[2])
        if (len(dat)==60):
            # print(grav_60)
            grav_60 = model_len60([torch.unsqueeze(torch.Tensor(grav_60), 0), torch.unsqueeze(torch.Tensor(dat), 0)]).squeeze(0).numpy()
            dat = []
            grav = grav_60
        else:
            grav = model(torch.unsqueeze(torch.Tensor(item), 0)).squeeze(0).numpy()

        gtb1, gtb2, gtb3 = que_g.get()
        # try:
        #     dat = que_l.get(block=True, timeout=0.1)
        #     grav = grav + model_len60(torch.unsqueeze(torch.Tensor(dat), 0)).squeeze(0).numpy()
        # except queue.Empty:
        #     pass

        xb = int(((gtb1 + G) / (2 * G)) * X) - 1
        yb = int(((-gtb2 + G) / (2 * G)) * Y) - 1

        xa = int(((g1a + G) / (2 * G)) * X) - 1
        ya = int(((-g2a + G) / (2 * G)) * Y) - 1
        
        image = np.ones((X, Y), dtype = np.uint8)
        image.fill(0)
        for y_ in range(yb-5, yb+5):
            image[y_][xb-5:xb+5] = 100
        for y_ in range(ya-5, ya+5):
            image[y_][xa-5:xa+5] = 255
        image = cv2.cvtColor(image,cv2.COLOR_GRAY2RGB)
        # image = cv2.flip(image, 1)
        # print(image.shape, type(image))
        _, image = cv2.imencode('.jpeg', image)
        display_handle.update(Image(data=image.tobytes()))
        
except KeyboardInterrupt:
    display_handle.update(None)
    pass

None