In [None]:
# 필요한 라이브러리 설치
!pip install numpy open3d scikit-learn scipy tqdm



In [None]:
# 필요한 라이브러리 임포트
import numpy as np
import open3d as o3d
from sklearn.cluster import DBSCAN
from scipy.spatial import KDTree
import urllib.request
import struct
from tqdm import tqdm
import zipfile
from pathlib import Path
import time
import shutil
import matplotlib.pyplot as plt
from typing import List, Dict, Tuple, Optional, Any
from IPython.display import Image, display

In [None]:
# 1. ParkingSpaceSegmentation 클래스 정의
class ParkingSpaceSegmentation:
    """주차 공간 세그멘테이션을 위한 클래스"""

    def __init__(self, min_points: int = 100, eps: float = 0.3,
                ground_height_threshold: float = 0.3,
                min_space_width: float = 2.5, min_space_length: float = 5.0):
        """
        Args:
            min_points: DBSCAN 클러스터링을 위한 최소 포인트 수
            eps: DBSCAN 클러스터링 거리 임계값
            ground_height_threshold: 지면 검출을 위한 높이 임계값
            min_space_width: 최소 주차 공간 너비
            min_space_length: 최소 주차 공간 길이
        """
        self.min_points = min_points
        self.eps = eps
        self.ground_height_threshold = ground_height_threshold
        self.min_space_width = min_space_width
        self.min_space_length = min_space_length

    def preprocess_pointcloud(self, points: np.ndarray) -> np.ndarray:
        """포인트 클라우드 전처리

        Args:
            points: 입력 포인트 클라우드 (N x 3)

        Returns:
            전처리된 포인트 클라우드
        """
        if len(points) == 0:
            return np.array([])

        try:
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(points)

            # 높이 기반 필터링
            height_mask = np.asarray(pcd.points)[:, 2] < 30
            pcd.points = o3d.utility.Vector3dVector(np.asarray(pcd.points)[height_mask])

            # 통계적 이상치 제거
            pcd, _ = pcd.remove_statistical_outlier(
                nb_neighbors=20,
                std_ratio=2.0
            )

            # 복셀 다운샘플링
            pcd = pcd.voxel_down_sample(voxel_size=0.1)

            # 법선 벡터 계산
            pcd.estimate_normals(
                search_param=o3d.geometry.KDTreeSearchParamHybrid(
                    radius=0.3,
                    max_nn=30
                )
            )

            return np.asarray(pcd.points)

        except Exception as e:
            print(f"Error in point cloud preprocessing: {e}")
            return points

    def extract_ground_plane(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]:
        """지면 평면 추출

        Args:
            points: 입력 포인트 클라우드

        Returns:
            (지면 포인트, 비지면 포인트, 평면 모델 파라미터)
        """
        if len(points) == 0:
            return np.array([]), np.array([]), None

        try:
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(points)

            # RANSAC 지면 검출
            plane_model, inliers = pcd.segment_plane(
                distance_threshold=self.ground_height_threshold,
                ransac_n=3,
                num_iterations=1000
            )

            # 지면/비지면 분리
            ground_points = points[inliers]
            non_ground_mask = ~np.isin(np.arange(len(points)), inliers)
            non_ground_points = points[non_ground_mask]

            # 높이 기반 추가 필터링
            height_threshold = np.median(ground_points[:, 2]) + self.ground_height_threshold
            height_mask = non_ground_points[:, 2] > height_threshold
            filtered_non_ground = non_ground_points[height_mask]

            return ground_points, filtered_non_ground, plane_model

        except Exception as e:
            print(f"Error in ground plane extraction: {e}")
            return np.array([]), points, None

    def detect_parking_spaces(self, points: np.ndarray) -> List[Dict[str, Any]]:
        """주차 공간 검출

        Args:
            points: 전처리된 포인트 클라우드

        Returns:
            검출된 주차 공간 정보 리스트
        """
        if len(points) == 0:
            return []

        try:
            # 지면 분리
            _, objects, _ = self.extract_ground_plane(points)
            if len(objects) < self.min_points:
                return []

            # DBSCAN 클러스터링
            clustering = DBSCAN(eps=self.eps, min_samples=self.min_points).fit(objects[:, :2])
            labels = clustering.labels_

            parking_spaces = []
            for label in set(labels):
                if label == -1:
                    continue

                cluster_points = objects[labels == label]
                if len(cluster_points) < self.min_points:
                    continue

                try:
                    # PCA로 방향 계산
                    covariance_matrix = np.cov(cluster_points.T)
                    eigenvalues, eigenvectors = np.linalg.eigh(covariance_matrix[:2, :2])

                    # 방향성이 불명확한 경우 제외
                    if eigenvalues[1] < 2 * eigenvalues[0]:
                        continue

                    angle = np.arctan2(eigenvectors[1, 1], eigenvectors[0, 1])

                    # 회전된 좌표계 변환
                    rotation_matrix = np.array([
                        [np.cos(angle), -np.sin(angle)],
                        [np.sin(angle), np.cos(angle)]
                    ])

                    rotated_points = np.dot(cluster_points[:, :2], rotation_matrix)
                    min_bound = np.min(rotated_points, axis=0)
                    max_bound = np.max(rotated_points, axis=0)

                    dimensions = np.array([
                        max_bound[0] - min_bound[0],
                        max_bound[1] - min_bound[1],
                        np.max(cluster_points[:, 2]) - np.min(cluster_points[:, 2])
                    ])

                    if not self.is_valid_parking_space(dimensions):
                        continue

                    center = np.array([
                        (min_bound[0] + max_bound[0]) / 2,
                        (min_bound[1] + max_bound[1]) / 2,
                        np.mean(cluster_points[:, 2])
                    ])

                    # 원래 좌표계로 변환
                    center_2d = np.dot(rotation_matrix.T, center[:2])
                    center = np.array([center_2d[0], center_2d[1], center[2]])

                    # 점유율 계산
                    cell_size = 0.2
                    grid_w = int(np.ceil(dimensions[0] / cell_size))
                    grid_h = int(np.ceil(dimensions[1] / cell_size))
                    total_cells = grid_w * grid_h
                    occupied_cells = len(np.unique(
                        (rotated_points / cell_size).astype(int), axis=0))

                    parking_spaces.append({
                        'center': center,
                        'dimensions': dimensions,
                        'angle': angle,
                        'points': cluster_points,
                        'occupancy': occupied_cells / total_cells,
                        'confidence': eigenvalues[1] / eigenvalues[0]
                    })

                except np.linalg.LinAlgError:
                    continue
                except Exception as e:
                    print(f"Error processing cluster: {e}")
                    continue

            return parking_spaces

        except Exception as e:
            print(f"Error in parking space detection: {e}")
            return []

    def monitor_changes(self, previous_spaces: List[Dict[str, Any]],
                       current_points: np.ndarray) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
        """주차 공간 변화 감지

        Args:
            previous_spaces: 이전 프레임의 주차 공간 정보
            current_points: 현재 프레임의 포인트 클라우드

        Returns:
            (현재 주차 공간 정보, 변화 정보)
        """
        try:
            current_spaces = self.detect_parking_spaces(current_points)
            if not previous_spaces or not current_spaces:
                return current_spaces, []

            changes = []
            prev_centers = np.array([space['center'] for space in previous_spaces])
            current_centers = np.array([space['center'] for space in current_spaces])

            tree = KDTree(prev_centers)

            for i, current_space in enumerate(current_spaces):
                try:
                    dist, idx = tree.query(current_space['center'])

                    if dist < 1.0:  # 같은 공간으로 간주
                        prev_space = previous_spaces[idx]

                        occupancy_change = current_space['occupancy'] - prev_space['occupancy']
                        confidence_change = (current_space['confidence'] -
                                          prev_space.get('confidence', 1.0))

                        if abs(occupancy_change) > 0.3 or abs(confidence_change) > 0.5:
                            changes.append({
                                'space_id': idx,
                                'change_type': 'occupancy_change',
                                'previous_occupancy': prev_space['occupancy'],
                                'current_occupancy': current_space['occupancy'],
                                'occupancy_change': occupancy_change,
                                'confidence_change': confidence_change
                            })
                    else:
                        changes.append({
                            'space_id': len(previous_spaces) + len(changes),
                            'change_type': 'new_space',
                            'center': current_space['center'],
                            'occupancy': current_space['occupancy'],
                            'confidence': current_space['confidence']
                        })

                except Exception as e:
                    print(f"Error processing space {i}: {e}")
                    continue

            return current_spaces, changes

        except Exception as e:
            print(f"Error in change monitoring: {e}")
            return [], []

    def is_valid_parking_space(self, dimensions: np.ndarray) -> bool:
        """주차 공간의 크기가 유효한지 검증

        Args:
            dimensions: 주차 공간의 치수 (width, length, height)

        Returns:
            유효한 주차 공간인지 여부
        """
        try:
            # width와 length가 최소 크기 이상인지 확인
            is_valid = (
                dimensions[0] >= self.min_space_width and
                dimensions[1] >= self.min_space_length
            )

            # 추가 검증 (예: 비율 확인)
            width_ratio = dimensions[0] / dimensions[1]
            is_reasonable_ratio = 0.2 <= width_ratio <= 5.0  # 너무 길쭉하거나 넓지 않은지

            return is_valid and is_reasonable_ratio

        except Exception as e:
            print(f"Error validating parking space dimensions: {e}")
            return False

In [None]:
# 2. KittiDataLoader 클래스 정의
class KittiDataLoader:
    """KITTI 데이터셋 로더"""

    def __init__(self, base_path: str = "./kitti_data"):
        self.base_path = Path(base_path)
        self.velodyne_path = self.base_path / "dataset" / "sequences" / "00" / "velodyne"

    def check_data_exists(self) -> bool:
        """데이터셋 존재 여부 확인"""
        return self.velodyne_path.exists() and any(self.velodyne_path.glob("*.bin"))

    def get_free_space(self, path: Path) -> int:
        """디스크 여유 공간 확인"""
        if not path.exists():
            path.mkdir(parents=True)
        return shutil.disk_usage(str(path))[2]

    def download_sample_data(self, force_download: bool = False) -> bool:
        """데이터셋 다운로드

        Args:
            force_download: 기존 데이터 존재시에도 강제 다운로드

        Returns:
            다운로드 성공 여부
        """
        try:
            if self.check_data_exists() and not force_download:
                print("KITTI dataset already exists. Skipping download.")
                return True

            base_url = "https://s3.eu-central-1.amazonaws.com/avg-kitti"
            files = ["data_odometry_velodyne.zip"]

            self.base_path.mkdir(exist_ok=True)

            for file in files:
                output_path = self.base_path / file
                if not output_path.exists() or force_download:
                    print(f"Downloading {file}...")
                    url = f"{base_url}/{file}"

                    try:
                        response = urllib.request.urlopen(url)
                        total_size = int(response.headers['Content-Length'])

                        # 디스크 공간 확인
                        free_space = self.get_free_space(self.base_path)
                        if total_size > free_space:
                            raise IOError(
                                f"Not enough disk space. Need {total_size/1e9:.1f}GB, "
                                f"but only {free_space/1e9:.1f}GB available"
                            )

                        # 다운로드
                        with tqdm(total=total_size, unit='B', unit_scale=True, desc=file) as pbar:
                            urllib.request.urlretrieve(
                                url,
                                output_path,
                                lambda count, block_size, total: pbar.update(block_size)
                            )

                        # 압축 해제
                        print(f"\nExtracting {file}...")
                        with zipfile.ZipFile(output_path, 'r') as zip_ref:
                            for member in tqdm(zip_ref.namelist(), desc="Extracting"):
                                try:
                                    zip_ref.extract(member, self.base_path)
                                except Exception as e:
                                    print(f"Error extracting {member}: {e}")
                                    continue

                        output_path.unlink()  # 압축 파일 삭제

                    except urllib.error.URLError as e:
                        print(f"Download failed: {e}")
                        return False
                    except zipfile.BadZipFile:
                        print(f"Corrupt zip file: {file}")
                        output_path.unlink()
                        return False
                    except Exception as e:
                        print(f"Unexpected error: {e}")
                        return False

            print("Download and extraction complete!")
            return True

        except Exception as e:
            print(f"Failed to download/extract dataset: {e}")
            return False

    def read_velodyne_bin(self, bin_path: Path) -> np.ndarray:
        """KITTI velodyne 포인트 클라우드 파일 읽기

        Args:
            bin_path: .bin 파일 경로

        Returns:
            포인트 클라우드 배열 (N x 3)
        """
        try:
            size_float = 4
            list_pcd = []

            with open(bin_path, "rb") as f:
                byte = f.read(size_float * 4)
                while byte:
                    try:
                        x, y, z, intensity = struct.unpack("ffff", byte)
                        list_pcd.append([x, y, z])
                        byte = f.read(size_float * 4)
                    except struct.error:
                        print(f"Warning: Incomplete data in {bin_path}")
                        break

            return np.array(list_pcd)

        except FileNotFoundError:
            print(f"File not found: {bin_path}")
            return np.array([])
        except Exception as e:
            print(f"Error reading file: {e}")
            return np.array([])

    def load_frames(self, num_frames: int = 5) -> List[np.ndarray]:
        """지정된 수의 프레임 로드

        Args:
            num_frames: 로드할 프레임 수

        Returns:
            포인트 클라우드 프레임 리스트
        """
        frames = []

        try:
            if not self.check_data_exists():
                print("KITTI dataset not found. Please download first.")
                return frames

            bin_files = sorted(self.velodyne_path.glob("*.bin"))

            if not bin_files:
                print("No point cloud files found.")
                return frames

            num_frames = min(num_frames, len(bin_files))

            for bin_file in tqdm(bin_files[:num_frames], desc="Loading frames"):
                points = self.read_velodyne_bin(bin_file)
                if len(points) > 0:
                    frames.append(points)
                else:
                    print(f"Warning: No points in {bin_file}")

            if not frames:
                print("Warning: No valid frames loaded")
            else:
                print(f"Successfully loaded {len(frames)} frames")

        except Exception as e:
            print(f"Error loading frames: {e}")

        return frames

In [None]:
# 3. ParkingSpaceVisualization 클래스 정의
class ParkingSpaceVisualization:
    """주차 공간 시각화"""

    def __init__(self, headless: bool = True):
        """
        Args:
            headless: 헤드리스 모드 사용 여부
        """
        self.vis = None
        self.headless = headless

        try:
            if not headless:
                # GUI 모드 초기화
                if not hasattr(o3d.visualization.gui.Application, '_instance'):
                    o3d.visualization.gui.Application.instance.initialize()
                self.vis = o3d.visualization.Visualizer()
                self.vis.create_window(visible=True, width=1280, height=720)
            else:
                # 헤드리스 모드 초기화
                self.vis = o3d.visualization.Visualizer()
                self.vis.create_window(visible=False, width=1280, height=720)

            if self.vis is not None:
                render_option = self.vis.get_render_option()
                render_option.background_color = np.asarray([0.1, 0.1, 0.1])
                render_option.point_size = 2.0

                view_control = self.vis.get_view_control()
                view_control.set_zoom(0.7)
                view_control.set_lookat([0, 0, 0])
                view_control.set_up([0, 0, 1])

        except Exception as e:
            print(f"Failed to initialize visualizer: {e}")
            print("Running in pure headless mode...")
            self.vis = None

    def create_parking_space_mesh(self, center: np.ndarray,
                                dimensions: np.ndarray,
                                angle: float,
                                occupancy: float) -> o3d.geometry.TriangleMesh:
        """주차 공간 메시 생성"""
        box = o3d.geometry.TriangleMesh.create_box(
            width=dimensions[0],
            height=dimensions[1],
            depth=0.2
        )

        # 회전 및 이동
        R = box.get_rotation_matrix_from_xyz((0, 0, angle))
        box.rotate(R, center=True)
        box.translate(center - np.array([dimensions[0]/2, dimensions[1]/2, 0.1]))

        # 점유 상태에 따른 색상
        if occupancy > 0.7:
            color = [1, 0, 0]  # 빨간색 (점유)
        elif occupancy > 0.3:
            color = [1, 0.5, 0]  # 주황색 (부분 점유)
        else:
            color = [0, 1, 0]  # 녹색 (비어있음)

        box.paint_uniform_color(color)
        return box

    def visualize_frame(self, points: np.ndarray,
                       parking_spaces: Optional[List[Dict[str, Any]]] = None):
        """프레임 시각화"""
        # 현재 데이터 저장
        self.current_points = points
        self.current_spaces = parking_spaces

        if self.vis is None:
            return

        try:
            self.vis.clear_geometries()

            # 포인트 클라우드
            pcd = o3d.geometry.PointCloud()
            pcd.points = o3d.utility.Vector3dVector(points)
            pcd.paint_uniform_color([0.7, 0.7, 0.7])
            self.vis.add_geometry(pcd)

            if parking_spaces:
                for space in parking_spaces:
                    try:
                        box = self.create_parking_space_mesh(
                            space['center'],
                            space['dimensions'],
                            space.get('angle', 0),
                            space.get('occupancy', 0)
                        )
                        self.vis.add_geometry(box)
                    except Exception as e:
                        print(f"Error visualizing parking space: {e}")
                        continue

            self.vis.poll_events()
            self.vis.update_renderer()

        except Exception as e:
            print(f"Error in visualization: {e}")

    def save_screenshot(self, filename: str):
        """현재 뷰를 이미지로 저장"""
        try:
            if self.vis is not None:
                self.vis.capture_screen_image(filename)
            else:
                # matplotlib으로 대체
                plt.figure(figsize=(12, 8))
                plt.scatter(self.current_points[:, 0],
                          self.current_points[:, 1],
                          c='gray', s=1, alpha=0.5)

                if hasattr(self, 'current_spaces') and self.current_spaces:
                    for space in self.current_spaces:
                        center = space['center']
                        angle = space.get('angle', 0)
                        dimensions = space['dimensions']
                        occupancy = space.get('occupancy', 0)

                        # 주차 공간 표시
                        corners = self._get_box_corners(center, dimensions, angle)
                        plt.plot(corners[[0,1,2,3,0], 0],
                               corners[[0,1,2,3,0], 1],
                               color=self._get_color(occupancy))

                plt.axis('equal')
                plt.savefig(filename, dpi=150, bbox_inches='tight')
                plt.close()

        except Exception as e:
            print(f"Error saving screenshot: {e}")

    def _get_box_corners(self, center, dimensions, angle):
        """박스의 모서리 좌표 계산"""
        w, h = dimensions[:2]
        corners = np.array([
            [-w/2, -h/2], [w/2, -h/2],
            [w/2, h/2], [-w/2, h/2]
        ])

        # 회전
        R = np.array([
            [np.cos(angle), -np.sin(angle)],
            [np.sin(angle), np.cos(angle)]
        ])
        corners = np.dot(corners, R.T)

        # 이동
        corners += center[:2]
        return corners

    def _get_color(self, occupancy):
        """점유율에 따른 색상 반환"""
        if occupancy > 0.7:
            return 'red'
        elif occupancy > 0.3:
            return 'orange'
        return 'green'

    def close(self):
        """시각화 창 닫기"""
        if self.vis is not None:
            self.vis.destroy_window()

In [None]:
class ParkingSpaceLearner:
    """주차 공간 학습 및 관리"""

    def __init__(self, save_path: str = "./parking_model"):
        self.save_path = Path(save_path)
        self.save_path.mkdir(exist_ok=True)
        self.known_spaces = {}  # 학습된 주차 공간 정보
        self.space_history = []  # 변화 이력

    def update_space_model(self, frame_id: int, spaces: List[Dict[str, Any]]):
        """주차 공간 모델 업데이트"""
        for space in spaces:
            space_id = space.get('space_id', len(self.known_spaces))
            if space_id not in self.known_spaces:
                self.known_spaces[space_id] = {
                    'center': space['center'],
                    'dimensions': space['dimensions'],
                    'angle': space.get('angle', 0),
                    'confidence': space.get('confidence', 0),
                    'observations': 1
                }
            else:
                # 이동 평균으로 업데이트
                prev = self.known_spaces[space_id]
                alpha = 1.0 / (prev['observations'] + 1)

                prev['center'] = (1-alpha) * prev['center'] + alpha * space['center']
                prev['dimensions'] = (1-alpha) * prev['dimensions'] + alpha * space['dimensions']
                prev['angle'] = (1-alpha) * prev['angle'] + alpha * space.get('angle', 0)
                prev['confidence'] = max(prev['confidence'], space.get('confidence', 0))
                prev['observations'] += 1

        # 변화 이력 기록
        self.space_history.append({
            'frame_id': frame_id,
            'spaces': spaces
        })

    def save_model(self):
        """모델 저장"""
        data = {
            'known_spaces': self.known_spaces,
            'history': self.space_history
        }
        np.save(self.save_path / "parking_model.npy", data)

    def load_model(self) -> bool:
        """모델 로드"""
        try:
            model_path = self.save_path / "parking_model.npy"
            if model_path.exists():
                data = np.load(model_path, allow_pickle=True).item()
                self.known_spaces = data['known_spaces']
                self.space_history = data['history']
                return True
            return False
        except Exception as e:
            print(f"Error loading model: {e}")
            return False

In [None]:
def optimize_point_cloud(points, voxel_size=0.1):
    """
    Optimizes the point cloud data for faster processing.

    Args:
        points (np.ndarray): The input point cloud data.
        voxel_size (float, optional): The size of the voxels. Defaults to 0.1.

    Returns:
        np.ndarray: The optimized point cloud data.
    """
    # Calculate the non-zero minimum and maximum values along each axis
    non_zero_min = np.min(points[np.nonzero(points)], axis=0)
    non_zero_max = np.max(points[np.nonzero(points)], axis=0)

    # Calculate the voxel grid dimensions
    grid_dimensions = ((non_zero_max - non_zero_min) / voxel_size).astype(int) + 1

    # Calculate voxel indices for each point
    voxel_indices = ((points - non_zero_min) / voxel_size).astype(int)

    # Create a linear index for each voxel
    linear_indices = np.ravel_multi_index(voxel_indices.T, grid_dimensions)

    # Calculate the unique voxels and their counts
    unique_voxels, counts = np.unique(linear_indices, return_counts=True)

    # Calculate the voxel centers
    voxel_centers = np.zeros((len(unique_voxels), 3))

    # Iterate over unique voxels and calculate their centers
    for i, voxel_index in enumerate(unique_voxels):
        # Get the indices of points belonging to the current voxel
        point_indices = np.where(linear_indices == voxel_index)[0]

        # Calculate the center of the current voxel
        voxel_centers[i] = np.mean(points[point_indices], axis=0)

    return voxel_centers

class OptimizedParkingSpaceSegmentation(ParkingSpaceSegmentation):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.prev_ground_model = None  # 이전 지면 모델 캐시

    def preprocess_pointcloud(self, points: np.ndarray) -> np.ndarray:
        # 기존 전처리 최적화
        points = optimize_point_cloud(points)
        return super().preprocess_pointcloud(points)

    def extract_ground_plane(self, points: np.ndarray) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray]]:
        # 이전 지면 모델 활용
        if self.prev_ground_model is not None:
            # 이전 모델로 빠른 분류 시도
            distances = np.abs(np.dot(points, self.prev_ground_model[:3]) + self.prev_ground_model[3])
            inliers = distances < self.ground_height_threshold
            if np.sum(inliers) > len(points) * 0.3:  # 충분한 지면 포인트가 있다면
                return points[inliers], points[~inliers], self.prev_ground_model

        # 새로운 지면 검출
        ground_points, non_ground, model = super().extract_ground_plane(points)
        if model is not None:
            self.prev_ground_model = model
        return ground_points, non_ground, model

In [None]:
class ParkingSpaceEvaluator:
    """주차 공간 검출 평가"""

    def __init__(self):
        self.metrics = {
            'total_frames': 0,
            'detected_spaces': [],
            'changes_detected': [],
            'processing_times': [],
            'false_positives': 0,
            'missed_detections': 0
        }

    def update(self, frame_id: int,
              spaces: List[Dict[str, Any]],
              changes: List[Dict[str, Any]],
              processing_time: float):
        """평가 지표 업데이트"""
        self.metrics['total_frames'] += 1
        self.metrics['detected_spaces'].append(len(spaces))
        self.metrics['changes_detected'].append(len(changes))
        self.metrics['processing_times'].append(processing_time)

    def compute_metrics(self) -> Dict[str, float]:
        """평가 지표 계산"""
        if self.metrics['total_frames'] == 0:
            return {}

        return {
            'avg_spaces_per_frame': np.mean(self.metrics['detected_spaces']),
            'avg_changes_per_frame': np.mean(self.metrics['changes_detected']),
            'avg_processing_time': np.mean(self.metrics['processing_times']),
            'std_processing_time': np.std(self.metrics['processing_times']),
            'detection_stability': np.std(self.metrics['detected_spaces']),
            'total_frames': self.metrics['total_frames']
        }

    def plot_metrics(self):
        """평가 지표 시각화"""
        metrics = self.compute_metrics()
        if not metrics:
            return

        plt.figure(figsize=(12, 8))
        plt.subplot(2, 1, 1)
        plt.plot(self.metrics['detected_spaces'], label='Detected Spaces')
        plt.plot(self.metrics['changes_detected'], label='Changes')
        plt.legend()
        plt.title('Detection Results over Time')

        plt.subplot(2, 1, 2)
        plt.plot(self.metrics['processing_times'])
        plt.title('Processing Time per Frame')
        plt.ylabel('Seconds')

        plt.tight_layout()
        plt.show()

In [None]:
# 인스턴스 생성
loader = KittiDataLoader()
segmentation = OptimizedParkingSpaceSegmentation(
    min_points=100,
    eps=0.5,
    ground_height_threshold=0.3,
    min_space_width=2.5,
    min_space_length=5.0
)
visualizer = ParkingSpaceVisualization(headless=True)
learner = ParkingSpaceLearner()
evaluator = ParkingSpaceEvaluator()

Failed to initialize visualizer: 'NoneType' object has no attribute 'background_color'
Running in pure headless mode...


In [None]:
# 기존 모델 불러오기 시도
learner.load_model()

True

In [None]:
# 데이터셋 체크 및 다운로드
if not loader.check_data_exists():
    print("Downloading KITTI dataset...")
    loader.download_sample_data()
else:
    print("KITTI dataset already exists")

KITTI dataset already exists


In [None]:
# 프레임 로드
frames = loader.load_frames(num_frames=5)
print(f"Loaded {len(frames)} frames")

Loading frames: 100%|██████████| 5/5 [00:02<00:00,  2.43it/s]

Successfully loaded 5 frames
Loaded 5 frames





In [None]:
# 프레임별 처리 및 시각화
try:
    previous_spaces = None

    for i, frame in enumerate(frames):
        print(f"\nProcessing frame {i+1}/{len(frames)}")
        start_time = time.time()

        # 포인트 클라우드 전처리
        processed_points = segmentation.preprocess_pointcloud(frame)
        if len(processed_points) == 0:
            print("Warning: No points after preprocessing")
            continue

        # 주차 공간 검출
        if previous_spaces is None:
            current_spaces = segmentation.detect_parking_spaces(processed_points)
            print(f"Detected {len(current_spaces)} initial parking spaces")
            changes = []
        else:
            current_spaces, changes = segmentation.monitor_changes(
                previous_spaces, processed_points)

        # 모델 업데이트
        learner.update_space_model(i, current_spaces)

        # 평가 지표 업데이트
        processing_time = time.time() - start_time
        evaluator.update(i, current_spaces, changes, processing_time)

        # 변화 출력
        if changes:
            print(f"Detected {len(changes)} changes:")
            for change in changes:
                if change['change_type'] == 'occupancy_change':
                    print(f"- Space {change['space_id']}: "
                          f"Occupancy changed by {change['occupancy_change']:.1%}")
                else:
                    print(f"- New space detected at {change['center']}")

        # 결과 시각화
        visualizer.visualize_frame(processed_points, current_spaces)
        visualizer.save_screenshot(f"frame_{i+1}.png")

        previous_spaces = current_spaces
        time.sleep(1)  # 시각화를 위한 짧은 대기

finally:
    # 결과 저장 및 정리
    learner.save_model()
    visualizer.close()


Processing frame 1/5


ValueError: parameter multi_index must be a sequence of length 1

In [None]:
# 평가 결과 출력
print("\nPerformance Metrics:")
metrics = evaluator.compute_metrics()
for key, value in metrics.items():
    print(f"{key}: {value:.3f}")

In [None]:
# 평가 지표 시각화
evaluator.plot_metrics()

In [None]:
# 결과 이미지 표시
print("\nProcessing results:")
for i in range(len(frames)):
    display(Image(f"frame_{i+1}.png"))