In [47]:
from utils.hole_tracker_V3 import HoleTracker
import numpy as np

Tracker = HoleTracker(
    10, 10, 10,
    tiebreak_method = "KDE-N100-BW0.2",
    update_method   = "AVG-N20",
    
    )


print(Tracker.TIEBREAK_METHOD, Tracker.TIEBREAK_N, Tracker.TIEBREAK_BW)
print(Tracker.UPDATE_METHOD, Tracker.UPDATE_N, Tracker.UPDATE_BW)
Tracker._kill_memory()
print("------------------------------------------------------------------- \n\n")

Tracker._add_imu_data(00, [1, 0, 0]+[0]*3)
Tracker._add_imu_data(10, [0, 1, 0]+[0]*3)
Tracker._add_imu_data(20, [0, 0, 1]+[0]*3)
Tracker._add_imu_data(30, [0, 1, 0]+[0]*3)
Tracker._add_imu_data(40, [1, 0, 0]+[0]*3)
Tracker._add_imu_data(50, [0, 1, 0]+[0]*3)
Tracker._add_imu_data(60, [0, 0, 1]+[0]*3)

Tracker._add_p_detection(5, [0, 0, 0])
print(Tracker._p_detection)
Tracker._add_p_detection(24, [0, 0, 0])
print(Tracker._p_detection)
Tracker._add_p_detection(52, [0, 0, 0])
print(Tracker._p_detection)


print(Tracker._p_detection["p"][1].T)

[0m[DEBUG]: initialized tracker![0m
KDE 100 0.2
AVG 20 None
[0m[DEBUG]: killed memory![0m
------------------------------------------------------------------- 


│              ts               │            p[0]            │            p[1]            │            p[2]            │
+-------------------------------+----------------------------+----------------------------+----------------------------+
│           5.000000            │          0.000000          │          0.000000          │          0.000000          │
│              ts               │            p[0]            │            p[1]            │            p[2]            │
+-------------------------------+----------------------------+----------------------------+----------------------------+
│           24.000000           │         -5.000000          │         -10.000000         │         -4.000000          │
│           24.000000           │          0.000000          │          0.000000          │          0.000000

In [30]:
Tracker = HoleTracker(
    10, 10, 10,
    tiebreak_method = "KDE-N100-BW0.2",
    update_method   = "AVG-N20",
    
    )


Tracker._add_imu_data(00, [0]*6)
Tracker._add_imu_data(10, [0]*6)
Tracker._add_imu_data(20, [0]*6)
Tracker._add_imu_data(30, [0]*6)

print(Tracker._imu_data["ts"][0:3, :])

[0m[DEBUG]: initialized tracker![0m
[[ 0.]
 [10.]
 [20.]]


In [27]:
imu_data = np.array([
    [00, 0, 0, 1, 0, 0, 0],
    [10, 0, 1, 0, 0, 0, 0],
    [20, 1, 0, 0, 0, 0, 0],
    [30, 0, 1, 0, 0, 0, 0],
    [40, 0, 0, 1, 0, 0, 0],
])

old_ts = 21


# find the indices for the relevant window of imu data to propagate old detections forward
sta_idx = np.clip(np.searchsorted(imu_data[:, 0].squeeze(), old_ts) - 1, a_min=0, a_max=None)
end_idx = len(imu_data) - 1 # always update the estimate up to the latest imu measurement

if sta_idx == end_idx:
    sta_idx = sta_idx - 1
    
print(f"start: {sta_idx} and end: {end_idx}")  
    
# extract the relevant imu steps and corresponding delta_ts
relevant_imu      = imu_data[:, 1:][sta_idx:end_idx, :] # here the last one is actually not needed (later for estimate)
relevant_ts       = imu_data[:, 0][:, None][sta_idx:end_idx+1, :]
relevant_ts[0, 0] = old_ts # the first inverval is only from old_ts to the next imu step!
delta_ts          = np.diff(relevant_ts, axis=0)


display(relevant_ts)
display(delta_ts)
display(relevant_imu)



start: 2 and end: 4


array([[21],
       [30],
       [40]])

array([[ 9],
       [10]])

array([[1, 0, 0, 0, 0, 0],
       [0, 1, 0, 0, 0, 0]])

In [72]:
from typing import Iterator
import time

class StructuredDeque:
    def __init__(self, maxlen: int, datatype: list):
        """
        a class for a simple structured numpy array with deque behaviour.
        
        Args:
            maxlen: maximum length of the deque, after which elements are pushed off 
            datatype: numpy datatype in the form of a list of tuples [("fieldname", np.dtype, shape_tuple), (...)]
        """
        
        self.MAXLEN = maxlen
        self.DTYPE  = datatype
        self._array = np.array([], dtype=self.DTYPE)
        return
    
    def __getitem__(self, idx):
        """
        overload of __getitem__ for convenience. basically the same as calling StructuredDeque._array[idx]

        Args:
            idx: indices, either keywords of fields or index or slice
        """
        
        return self._array[idx]
       
    def __setitem__(self, idx, value):
        """
        overload of __setitem__ for assigning values to the contents of _array. basically the same as calling StructuredDeque._array[idx] = [---]
        
        Args:
            idx: can be either slice, index or fieldname
            """
        
        self._array[idx] = value
        
    def __repr__(self) -> str:
        """
        for displaying the object. standard fallback for __str__!. display() calls this first though, print() only as fallback.
        """
        
        # get names and shapes for all the columns
        names  = list(self._array.dtype.names)
        shapes = []
        for i in range(len(self._array.dtype)):
            shapes.append(self._array.dtype[i].shape[0] if len(self._array.dtype[i].shape) > 0 else 1) 

        # repeat-expand names for each "subfield" that has a length of more than 1
        header_exp = []
        for na, sh in zip(names, shapes):
            if sh > 1:
                for i in range(sh):
                    header_exp += [f"{na}[{i}]"]
            else:
                header_exp += [f"{na}"]
        
        # collect all the data in one numpy array for convenience
        concat_data = None
        for key in self._array.dtype.names:
            # extract a column (can be of arbitrary width, depending on field width)
            if len(self._array[key].shape) == 1:
                # "unsqueeze" because if the field has a width of one, the returned array is only one-dimensional
                column = self._array[key][:, None]
            else:
                column = self._array[key]
            # connect them all
            if concat_data is None:
                concat_data = column
            else:
                concat_data = np.concatenate([concat_data, column], axis=1)
   
        # determine cell width and helper function for truncating the content of each cell to display nicely
        max_length = 120
        W          = int((max_length - len(header_exp) - 1)/len(header_exp))
        trunc_cell = lambda content, w: (content[:w-2] + " …") if (len(content) > w) else (content)
        
        # determine the leftover space that was los because division has a rest
        total_string_len = (W + 1) * len(header_exp) + 1
        leftover         = max_length - total_string_len
        
        # modify the first column to take up the leftover space (optional) (widths is now a list with w for each col)
        W     = [W] * len(header_exp)
        W[0] += leftover
        
        # build all the rows (use the list of widths to determine the with for each column)
        header_row    = "│" + "│".join([trunc_cell(f"{label:^{w}}", w) for w, label in zip(W, header_exp)]) + "│"
        separator_row = "+" + "+".join("-" * w for w, _ in zip(W, header_exp)) + "+"
        data_rows     = []
        for row in concat_data:
            formatted_row = "│" + "│".join([trunc_cell(f"{value:^{w}.6f}", w) for w, value in zip(W, row)]) + "│"
            data_rows.append(formatted_row)
        
        # build the table (optionally add datatype information)
        table = [header_row, separator_row] + data_rows
        table = "\n".join(table) # + f"\n\nStructuredDeque with fields of datatype: {self._array.dtype}"
        return table
        
    def __len__(self) -> int:
        """
        reimplementing the len method for convenience. avoids having to call len(structureddeque._array) every time
        """
        
        return len(self._array)
      
    def __iter__(self) -> Iterator[tuple]:
        """
        reimplementing the iter method for convenience. avoids having to call iter(structureddeque._array) every time
        """
        
        return iter(self._array)  
    
    def prepend(self, newline: tuple):
        """
        implements a "prepending" behaviour that works just like appending but from the other side. also pushes off elements when maxlen is reached!

        Args:
            newline: one new line of data to add. has to be a tuple which matches the original datatype!
        """
        
        # behaviour when the array is not at full length yet
        if len(self._array) < self.MAXLEN:
            newline     = np.array(newline, dtype=self.DTYPE)
            self._array = np.insert(self._array, 0, newline)
            return
        
        # behaviour when the array is at full length
        self._array[1:] = self._array[:-1]
        self._array[0]  = np.array(newline, dtype=self.DTYPE)
        return
    
    def append(self, newline: tuple):
        """
        basically works like to normal append of a collection's deque. appending until the maxlen is reached, then the oldest elements are pushed off.

        Args:
            newline: one new line of data to add. has to be a tuple which matches the original datatype!
        """
        
        # behaviour when the array is not at full length yet
        if len(self._array) < self.MAXLEN:
            newline     = np.array(newline, dtype=self.DTYPE)
            self._array = np.append(self._array, newline)
            return
        
        # behaviour when the array is at full length
        self._array[:-1] = self._array[1:] 
        self._array[-1]  = np.array(newline, dtype=self.DTYPE)
        return
      
    def clear(self):
        """
        empty the structured deque
        """
        
        self._array = np.array([], dtype=self.DTYPE)
        return
 

imu_data = StructuredDeque(maxlen=1000, datatype=[("ts", np.float64, (1, )), ("twist", np.float32, (6, ))])
p_estimate = StructuredDeque(maxlen=1000, datatype=[("ts", np.float64, (1, )), ("p", np.float32, (3, )), ("vp", np.float32, (3, ))])

p_estimate.append((1001, [0, 0, 0], [0, 0, 0]))

for i in range(1000):
    imu_data.append((i, np.random.rand(6, )))



t0 = time.perf_counter()
for i in range(2, len(imu_data) + 1):
    # inverting [p_k+1 = p_k + vp_k*Δt], where [vp_k = -v_cam_k - w_cam_k x p_k] to find the old starting point 
    # => p_k = inv(I - Δt*Ω_k) @ (p_k+1 + Δt*v_cam_k), where Ω = "cross-product-matrix" for w_cam_k
    v_cam_older = imu_data[-i]["twist"][0:3]
    w_cam_older = imu_data[-i]["twist"][3:6]
    Omega_older = np.array([
        [              0, -w_cam_older[2],  w_cam_older[1]],
        [ w_cam_older[2],               0, -w_cam_older[0]],
        [-w_cam_older[1],  w_cam_older[0],               0],
    ], dtype=np.float32)
    delta_t     = p_estimate[-i + 1]["ts"] - imu_data[-i]["ts"]
    p_newer     = p_estimate[-i + 1]["p"]
    p_older     = np.linalg.inv(np.eye(3, 3) - delta_t*Omega_older) @ (p_newer + delta_t*v_cam_older)
    vp_older    = - v_cam_older - np.cross(w_cam_older, p_older)
    
    # add the newly found old estimate to the list (but prepending to deque here!)
    p_estimate.prepend((imu_data[-i]["ts"], list(p_older), list(vp_older)))
t1 = time.perf_counter()
print(f"total time: {(t1-t0)*1000}ms")


print(p_estimate)


total time: 130.9869000033359ms
│       ts       │      p[0]      │      p[1]      │      p[2]      │     vp[0]      │     vp[1]      │     vp[2]      │
+----------------+----------------+----------------+----------------+----------------+----------------+----------------+
│    0.000000    │    0.448196    │    0.993071    │    5.832033    │   -0.018189    │    1.753805    │   -0.825885    │
│    1.000000    │    0.430008    │    2.746876    │    5.006148    │   -0.396188    │    1.075263    │   -1.228957    │
│    2.000000    │    0.033819    │    3.822139    │    3.777191    │    0.622977    │    0.442515    │   -1.049728    │
│    3.000000    │    0.656797    │    4.264655    │    2.727463    │    1.705253    │    0.028453    │   -1.599541    │
│    4.000000    │    2.362050    │    4.293108    │    1.127923    │   -0.091163    │   -0.827596    │    1.842769    │
│    5.000000    │    2.270887    │    3.465512    │    2.970692    │   -0.711293    │    1.504489    │   -2.769850    │
