In [None]:
import numpy as np
import os
import random
from multiprocessing import Pool

# 경로 설정
velo_base_path = '/media/ssd3/lab/sypark/KITTI/object/2d_object/data_object_velodyne/training/velodyne'  # 원본 데이터
label_base_path = '/media/ssd3/lab/sypark/KITTI/object/2d_object/data_object_label_2/training/label_2'  # 레이블 파일
calib_base_path = '/media/ssd3/lab/sypark/KITTI/object/2d_object/data_object_calib/training/calib'
output_base_path = '/media/ssd3/lab/jojeon/KITTI/data_spoofing/velodyne/train_add'  # 저장 경로

if not os.path.exists(output_base_path):
    os.makedirs(output_base_path)

print("Paths and directories set successfully!")


Paths and directories set successfully!


In [2]:
# .bin 파일을 읽는 함수
def load_bin_file(file_path):
    if os.path.exists(file_path):
        return np.fromfile(file_path, dtype=np.float32).reshape(-1, 4)
    else:
        print(f"LiDAR file not found: {file_path}")
        return None

# Calibration 파일에서 변환 행렬 읽기
def load_calibration(calib_path):
    with open(calib_path, 'r') as f:
        lines = f.readlines()

    def extract_matrix(line):
        return np.array(line.strip().split()[1:], dtype=np.float32).reshape(3, 4)

    Tr_velo_to_cam = extract_matrix(lines[5])
    R0_rect = np.array(lines[4].strip().split()[1:], dtype=np.float32).reshape(3, 3)
    Tr_velo_to_cam = np.vstack((Tr_velo_to_cam, [0, 0, 0, 1]))
    return Tr_velo_to_cam, R0_rect

# Label 파일에서 박스 정보 파싱
def parse_label_file(label_path):
    boxes = []
    with open(label_path, 'r') as f:
        for line in f:
            elements = line.strip().split()
            if elements[0] == 'DontCare':
                continue
            height, length, width = map(float, elements[8:11])
            x, y, z = map(float, elements[11:14])
            rotation_y = float(elements[14])
            boxes.append((x, y, z, length, width, height, rotation_y))
    return boxes

# 특정 Bounding Box 내부 포인트 필터링
def filter_points_in_box(points, box, Tr_cam_to_velo, R0_rect_4x4, scale_factor=1.0):
    x, y, z, length, width, height, rotation_y = box
    length *= scale_factor
    width *= scale_factor
    height *= scale_factor

    center_cam = np.array([x, y, z, 1])
    center_lidar = Tr_cam_to_velo @ R0_rect_4x4 @ center_cam

    rotation_matrix = np.array([
        [np.cos(rotation_y), -np.sin(rotation_y), 0],
        [np.sin(rotation_y), np.cos(rotation_y), 0],
        [0, 0, 1]
    ])

    points_shifted = points[:, :3] - center_lidar[:3]
    points_rotated = points_shifted @ rotation_matrix.T

    mask = (
        (np.abs(points_rotated[:, 0]) <= length / 2) &
        (np.abs(points_rotated[:, 1]) <= width / 2) &
        (points_rotated[:, 2] >= 0) & (points_rotated[:, 2] <= height)
    )
    return points[mask]

# 포인트 복사 및 이동 함수
def copy_and_translate_points(points, translation):
    copied_points = points.copy()
    copied_points[:, :3] += translation
    return copied_points

# 랜덤 이동 생성 함수
def generate_forward_translation(range_x=(2, 10), range_y=(-2, 2), range_z=(-0.5, 0.5)):
    return [
        random.uniform(*range_x),  # X축 이동
        random.uniform(*range_y),  # Y축 이동
        random.uniform(*range_z)   # Z축 이동
    ]

In [3]:
# LiDAR 데이터 처리 및 저장
def process_lidar_file(file_name):
    velo_file = os.path.join(velo_base_path, f"{file_name}.bin")
    label_file = os.path.join(label_base_path, f"{file_name}.txt")
    calib_file = os.path.join(calib_base_path, f"{file_name}.txt")
    output_dir = os.path.join(output_base_path, file_name)

    # 출력 경로 생성
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # 데이터 로드
    points = load_bin_file(velo_file)
    boxes = parse_label_file(label_file)
    Tr_velo_to_cam, R0_rect = load_calibration(calib_file)

    # 변환 행렬 계산
    Tr_cam_to_velo = np.linalg.inv(Tr_velo_to_cam)
    R0_rect_4x4 = np.vstack((np.hstack((R0_rect, np.zeros((3, 1)))), [0, 0, 0, 1]))

    # 객체별 포인트 필터링 및 이동 후 저장
    for i, box in enumerate(boxes):
        filtered_points = filter_points_in_box(points, box, Tr_cam_to_velo, R0_rect_4x4, scale_factor=1.0)
        if len(filtered_points) == 0:
            continue

        # 랜덤 이동 적용
        translation = generate_forward_translation()
        translated_points = copy_and_translate_points(filtered_points, translation)

        # 파일 저장
        output_file = os.path.join(output_dir, f"{file_name}-{i+1}.bin")
        translated_points.tofile(output_file)
        print(f"Saved: {output_file}")


In [5]:
from multiprocessing import Pool

if __name__ == "__main__":
    import glob
    import time

    # 파일 리스트 로드
    lidar_files = [os.path.basename(f).split(".")[0] for f in glob.glob(os.path.join(velo_base_path, "*.bin"))]
    print(f"Total files to process: {len(lidar_files)}")

    # 멀티프로세싱 실행
    start_time = time.time()
    with Pool(processes=100) as pool:  # 프로세스 수 설정
        pool.map(process_lidar_file, lidar_files)

    print(f"All files processed in {time.time() - start_time:.2f} seconds.")


Total files to process: 7481


FileNotFoundError: [Errno 2] No such file or directory: '/media/ssd3/lab/sypark/KITTI/object/2d_object/label_2/005030.txt'