In [1]:
import sys
import os

# notebooks 폴더의 상위 폴더(프로젝트 루트)를 경로에 추가합니다.
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

print(f"Project root added to path: {project_root}")


Project root added to path: c:\Users\KIST


In [None]:
from pathlib import Path
from site import check_enableusersite

import numpy as np
import matplotlib.pyplot as plt
import cv2
import rerun as rr
from rosbags.rosbag2 import Reader
from rosbags.serde import deserialize_cdr
from src.intrinsic_parameter import check_rosbag_path, get_camera_parameter
from src.rerun_blueprint import setup_rerun_blueprint, log_description



In [7]:
def remove_nan(pts, remove_nans=True):
    """ 포인트 클라우드 inf/nan 제거 """
    if remove_nans:
        # Check for both NaN & infinite values
        pts = pts[np.isfinite(pts).all(axis=1)]
    return pts


In [None]:
def rotate_pointcloud(pts):
    """x축 기준 -90도 회전, y축 기준 +90도 회전 """
    R = np.array([
        [0, 0, 1], 
        [-1, 0, 0], 
        [0, -1, 0]
        ], dtype=np.float32) 
    
    # rectified pointclouds
    pts_rct = pts @ R
    return pts_rct

In [None]:
def pointcloud2_to_xyz_numpy(msg):
    """ pointcloud2 to numpy array """
    def field_offset(name):
    # find offset
        for f in msg.fields:
            if f.name == name:
                return f.offset
        raise KeyError(f"field '{name}' not found")

    # pointcloud => numpy array
    # msg.fields에서 x/y/z의 offset을 찾기 > 각 포인트의 step을 이용해 바이트 버퍼 해석
    off_x = field_offset('x'); off_y = field_offset('y'); off_z = field_offset('z') # point data byte array
    step = msg.point_step # num of byte
    n = len(msg.data) // step # num of points
    buf = np.frombuffer(msg.data, dtype=np.uint8).reshape(n, step) # point data byte array

    # little-endian float32 / 4byte 부동소수점 숫자로 해석 
    x = buf[:, off_x:off_x+4].view('<f4').reshape(-1)  
    y = buf[:, off_y:off_y+4].view('<f4').reshape(-1)
    z = buf[:, off_z:off_z+4].view('<f4').reshape(-1)
    pts_xyz = np.column_stack((x, y, z))
    pts_xyz = remove_nan(pts_xyz, remove_nans=True)

    return pts_xyz

In [None]:
def rgb_to_numpy(msg):
    """ rgb to numpy array """
    def field_offset(name):
    # find offset
        for f in msg.fields:
            if f.name == name:
                return f.offset
        raise KeyError(f"field '{name}' not found")
    

    off_rgb = field_offset('rgb') # point data byte array
    step = msg.point_step # num of byte
    n = len(msg.data) // step # num of points
    buf = np.frombuffer(msg.data, dtype=np.uint8).reshape(n, step) # point data byte array
    rgb_u8 = buf[:, off_rgb:off_rgb+3] 
    
    b = rgb_u8[:, 0].astype(np.uint8)
    g = rgb_u8[:, 1].astype(np.uint8)
    r = rgb_u8[:, 2].astype(np.uint8)
    colors = np.stack([r, g, b], axis=1)
    
    return colors

In [None]:
def color_pointcloud_depthmap(pts: np.ndarray):
    """ point - rgb color match/ 포인트 원본 순서 유지한 상태에서 """

In [None]:
# extract intrinsic parameter of zed camera manually
def get_camera_parameter(bag_path: Path, camera_info_topic: str):
    TOPIC = camera_info_topic
 
    if not check_rosbag_path(bag_path) :
        return
    
    with Reader(bag_path) as reader:
        conns = [c for c in reader.connections 
                if c.topic == TOPIC and c.msgtype == "sensor_msgs/msg/CameraInfo"]
        for conn, timestamp, rawdata in reader.messages(conns):
            msg = deserialize_cdr(rawdata, conn.msgtype)
            break  # 첫 메시지 하나만 읽고 종료
    
    K = np.array(msg.k).reshape(3, 3)
    width = msg.width
    height = msg.height 
    cx, cy = K[0, 2], K[1, 2]
    fx, fy = K[0, 0], K[1, 1]

    print( width, height, cx, cy, fx, fy)

In [None]:
""" x, y, z 값이 어떻게 나오는지 확인 """
""" rgb 값이 어떻게 나오는지 확인하기 위한 용도"""
def run(bag_path: Path, config: dict):
    # 설정 파일에서 토픽 이름 가져오기
    image_topic = config["ros_topics"]["image"]
    depth_topic = config["ros_topics"]["depth"]
    camera_info_topic = config["ros_topics"]["camera_left_info"]
    point_cloud_topic = config["ros_topics"]["point_cloud"]
    
    # 설정 파일에서 시각화 파라미터 가져오기
    point_radius = config["visualization"]["point_radius"]
    app_id = f"zed_viewer_{bag_path.name}"

    with Reader(bag_path) as reader:
        # conns 리스트 생성 시 설정값 사용
        topics_to_read = [image_topic, depth_topic, point_cloud_topic]
        conns = [c for c in reader.connections if c.topic in topics_to_read]
        
        for conn, timestamp, rawdata in reader.messages(conns):
            msg = deserialize_cdr(rawdata, conn.msgtype)
            # pointcloud data logging
            if conn.topic == point_cloud_topic:
                
                # 메시지에 기록된 실제 해상도를 출력합니다.
                actual_width = msg.width
                actual_height = msg.height
                pts = pointcloud2_to_xyz_numpy(msg)
                
                if pts.size:
                    pts = rotate_pointcloud(pts) 
                    colors = rgb_to_numpy(msg)
                    # 포인트 클라우드의 전체 포인트 개수를 출력합니다.
                    print(f"이 메시지에 포함된 유효한 포인트의 개수: {len(pts)} 개") # 108339개
                    print(len(colors))
                
                    break # 첫 번째 포인트 클라우드 메시지만 처리하고 루프를 중단합니다.

In [None]:
project_root = Path.cwd()

if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

print(f"Project root added to path: {project_root}")

import argparse
import yaml  

def main():
    bag_path_str = "C:/rosbag2_2"  # 실제 rosbag 경로를 입력
    config_path_str = "config.yaml"

    bag_path = Path(bag_path_str)
    config_path = Path(config_path_str)

    # 설정 파일을 직접 읽어옵니다.
    with open(config_path, 'r', encoding='utf-8') as f:
        config = yaml.safe_load(f)

    run(bag_path, config)
main()

Project root added to path: c:\Users\KIST\zed-rosbag-visualizer
이 메시지에 포함된 유효한 포인트의 개수: 104370 개
114688


explicit typestores.

If you are deserializing messages from an AnyReader instance, simply
use its `.deserialize(data, typename)` method.

Otherwise instantiate a type store and use its methods:

from rosbags.typesys import Stores, get_typestore

typestore = get_typestore(Stores.ROS2_FOXY)
typestore.deserialize_cdr(data, typename)
  msg = deserialize_cdr(rawdata, conn.msgtype)


In [11]:
def color_pointcloud_depthmap(pts: np.ndarray):
    # point - color match/ 포인트 원본 순서 유지한 상태에서
    depth = pts[:, 2]

    if depth.size == 0:
        return np.array([], dtype=np.uint8)

    d_min = depth.min()
    d_max = depth.max()

    if d_max == d_min:
        normalized_d = np.zeros_like(depth)
    else:
        # normalization
        normalized_d = (depth - d_min) / (d_max - d_min)

    cmap = plt.get_cmap("jet")  # 가까운 것: 빨강, 먼 것: 파랑
    colors_float = cmap(normalized_d)[:, :3] # cmap (R,G,B,A) > (R,G,B)
    return (colors_float * 255).astype(np.uint8) # 0~255 범위의 정수형
