In [8]:
# Install the protobuf compiler
!apt install protobuf-compiler

# Compile the proto file to generate Python classes
!protoc --python_out=. frame.proto


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
protobuf-compiler is already the newest version (3.12.4-1ubuntu7.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.
Could not make proto path relative: frame.proto: No such file or directory


In [11]:
import os
import sys
import numpy as np

# Assuming you've added the directory containing the generated Python module to the Python path
sys.path.append('/content/sample_data/data')

from frame_pb2 import Frame

def parse_file(pb_path):
    frame_data = Frame()
    data = None
    try:
        with open(pb_path, 'rb') as file:
            data = file.read()
            print("Data Length:", len(data))  # Debugging statement
        frame_data.ParseFromString(data)
    except Exception as e:
        print(f"Failed to parse {pb_path}: {str(e)}")
        return None, None, None

    if frame_data.lidars and frame_data.lidars[0].detections:
        first_detection = frame_data.lidars[0].detections[0]
        x = first_detection.pos[0] if len(first_detection.pos) > 0 else None
        y = first_detection.pos[1] if len(first_detection.pos) > 1 else None
        z = first_detection.pos[2] if len(first_detection.pos) > 2 else None
        return x, y, z
    return None, None, None

def parse_pb_files(directory_path):
    file_paths = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.endswith('.pb')]
    print("Processing files:", file_paths)  # Debugging statement
    positions = []
    for path in file_paths:
        pos = parse_file(path)
        if pos and pos[0] is not None:
            positions.append(pos)
    return np.array(positions)

def kalman_filter(z, dt):
    if z.size == 0:
        raise ValueError("No measurements available to process.")
    x = np.array([[z[0][0]], [z[0][1]], [0], [0]])
    P = np.eye(4)
    F = np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]])
    H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]])
    R = np.array([[0.1, 0], [0, 0.1]])
    Q = np.eye(4) * 0.01

    for i in range(1, len(z)):
        x = F @ x
        P = F @ P @ F.T + Q
        S = H @ P @ H.T + R
        K = P @ H.T @ np.linalg.inv(S)
        y = z[i] - (H @ x)
        x = x + (K @ y)
        P = (np.eye(4) - (K @ H)) @ P
    return x, P

directory_path = '/content/sample_data/data'  # Adjust as needed
measurements = parse_pb_files(directory_path)
dt = 1.0  # Time step between measurements

if measurements.size > 0:
    final_state, final_covariance = kalman_filter(measurements, dt)
    print("Final State:", final_state)
    print("Final Covariance Matrix:\n", final_covariance)
else:
    print("No data to process.")


Processing files: ['/content/sample_data/data/frame_109.pb', '/content/sample_data/data/frame_158.pb', '/content/sample_data/data/frame_50.pb', '/content/sample_data/data/frame_148.pb', '/content/sample_data/data/frame_142.pb', '/content/sample_data/data/frame_47.pb', '/content/sample_data/data/frame_180.pb', '/content/sample_data/data/frame_186.pb', '/content/sample_data/data/frame_159.pb', '/content/sample_data/data/frame_64.pb', '/content/sample_data/data/frame_62.pb', '/content/sample_data/data/frame_197.pb', '/content/sample_data/data/frame_3.pb', '/content/sample_data/data/frame_22.pb', '/content/sample_data/data/frame_8.pb', '/content/sample_data/data/frame_18.pb', '/content/sample_data/data/frame_114.pb', '/content/sample_data/data/frame_66.pb', '/content/sample_data/data/frame_160.pb', '/content/sample_data/data/frame_135.pb', '/content/sample_data/data/frame_137.pb', '/content/sample_data/data/frame_103.pb', '/content/sample_data/data/frame_28.pb', '/content/sample_data/data/