In [1]:
# Colab에서 실행 중인지 확인
import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    print("Colab 환경에서 실행 중입니다. 필요한 라이브러리를 설치합니다...")
    !pip install -q torch torchvision
    !pip install -q transformers
    !pip install -q opencv-python matplotlib scipy scikit-image tqdm
    !pip install -q huggingface_hub
    !pip install -q timm
    !pip install -q git+https://github.com/huggingface/transformers.git
    !pip install -q filterpy  # Kalman 필터를 위한 라이브러리 추가
else:
    print("로컬 환경에서 실행 중입니다. 필요한 라이브러리가 이미 설치되어 있다고 가정합니다.")

# 필요한 라이브러리 임포트
import os
import cv2
import numpy as np
import torch
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from collections import defaultdict
from scipy.spatial import distance
from skimage.measure import regionprops
import random
from transformers import AutoImageProcessor, AutoModelForSemanticSegmentation
from filterpy.kalman import KalmanFilter  # Kalman 필터 추가
if IN_COLAB:
    from google.colab import files

# Hugging Face 모델 설정 및 로드
def setup_segmentation_model():
    model_name = "nvidia/segformer-b5-finetuned-cityscapes-1024-1024"
    print(f"모델 '{model_name}' 로드 중...")
    image_processor = AutoImageProcessor.from_pretrained(model_name)
    model = AutoModelForSemanticSegmentation.from_pretrained(model_name)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    cityscapes_classes = [
        'road', 'sidewalk', 'building', 'wall', 'fence', 'pole', 'traffic light',
        'traffic sign', 'vegetation', 'terrain', 'sky', 'person', 'rider', 'car',
        'truck', 'bus', 'train', 'motorcycle', 'bicycle'
    ]
    road_classes = [i for i, name in enumerate(cityscapes_classes) if name == 'road']
    vehicle_classes = [i for i, name in enumerate(cityscapes_classes) if name in ['car', 'truck', 'bus', 'train', 'motorcycle', 'bicycle']]

    print(f"도로 클래스: {[(i, cityscapes_classes[i]) for i in road_classes]}")
    print(f"탈것 클래스: {[(i, cityscapes_classes[i]) for i in vehicle_classes]}")

    return model, image_processor, device, road_classes, vehicle_classes, cityscapes_classes

model, image_processor, device, road_classes, vehicle_classes, classes = setup_segmentation_model()
print("모델 로드 완료!")



Colab 환경에서 실행 중입니다. 필요한 라이브러리를 설치합니다...
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.5/207.5 MB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m21.1/21.1 MB[0m [31m67.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for transformers (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.0/178.0 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for filterpy (setup.py) ... [?25l[?25hdone
모델 'nvidia/segformer-b5-finetuned-cityscapes-1024-1024' 로드 중...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/273 [00:00<?, ?B/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


config.json:   0%|          | 0.00/1.68k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/339M [00:00<?, ?B/s]

도로 클래스: [(0, 'road')]
탈것 클래스: [(13, 'car'), (14, 'truck'), (15, 'bus'), (16, 'train'), (17, 'motorcycle'), (18, 'bicycle')]
모델 로드 완료!
영상 파일을 업로드해주세요 (mp4 형식):
업로드된 영상: input_video.mp4
Error: 비디오 파일을 열 수 없습니다.
Error: 비디오 파일을 열 수 없습니다.


In [14]:
# 세그멘테이션 함수 정의
def segment_frame(model, image_processor, frame, device):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    inputs = image_processor(images=rgb_frame, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
    upsampled_logits = torch.nn.functional.interpolate(
        logits, size=rgb_frame.shape[:2], mode="bilinear", align_corners=False
    )
    sem_seg = upsampled_logits.argmax(dim=1).squeeze().cpu().numpy()

    road_mask = np.zeros_like(sem_seg, dtype=np.uint8)
    for road_class in road_classes:
        road_mask[sem_seg == road_class] = 255

    vehicle_mask = np.zeros_like(sem_seg, dtype=np.uint8)
    for vehicle_class in vehicle_classes:
        vehicle_mask[sem_seg == vehicle_class] = 255

    return road_mask, vehicle_mask, sem_seg

# 초기 ROI 설정 (도로 + 자동차)
def create_initial_roi(road_mask, vehicle_mask):
    initial_roi = cv2.bitwise_or(road_mask, vehicle_mask)
    return initial_roi


In [15]:
# 영상 업로드 (Colab 환경에서)
if IN_COLAB:
    print("영상 파일을 업로드해주세요 (mp4 형식):")
    video_path = "in1.mp4"
    print(f"업로드된 영상: {video_path}")


영상 파일을 업로드해주세요 (mp4 형식):
업로드된 영상: in1.mp4


In [16]:
# 비디오 정보 확인
def get_video_info(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: 비디오 파일을 열 수 없습니다.")
        return None
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    cap.release()
    return {"width": width, "height": height, "fps": fps, "frame_count": frame_count}

video_info = get_video_info(video_path)
if video_info:
    print(f"비디오 크기: {video_info['width']}x{video_info['height']}")
    print(f"FPS: {video_info['fps']}")
    print(f"총 프레임 수: {video_info['frame_count']}")
    print(f"영상 길이: {video_info['frame_count']/video_info['fps']:.2f}초")

# 세그멘테이션 함수
def segment_frame(model, image_processor, frame, device):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    inputs = image_processor(images=rgb_frame, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
    upsampled_logits = torch.nn.functional.interpolate(
        logits, size=rgb_frame.shape[:2], mode="bilinear", align_corners=False
    )
    sem_seg = upsampled_logits.argmax(dim=1).squeeze().cpu().numpy()

    road_mask = np.zeros_like(sem_seg, dtype=np.uint8)
    for road_class in road_classes:
        road_mask[sem_seg == road_class] = 255

    vehicle_mask = np.zeros_like(sem_seg, dtype=np.uint8)
    for vehicle_class in vehicle_classes:
        vehicle_mask[sem_seg == vehicle_class] = 255

    return road_mask, vehicle_mask, sem_seg

# 초기 ROI 설정 (도로 + 자동차)
def create_initial_roi(road_mask, vehicle_mask):
    initial_roi = cv2.bitwise_or(road_mask, vehicle_mask)
    return initial_roi

# 픽셀 변화 감지 함수
def detect_pixel_changes(prev_frame, curr_frame, threshold=30):
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
    diff = cv2.absdiff(prev_gray, curr_gray)
    _, diff_mask = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)
    return diff_mask

# 탈것 객체 속성 추출
def extract_vehicle_properties(vehicle_mask):
    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(vehicle_mask, connectivity=8)
    vehicles = []
    for i in range(1, num_labels):
        if stats[i, cv2.CC_STAT_AREA] < 100:
            continue
        x = stats[i, cv2.CC_STAT_LEFT]
        y = stats[i, cv2.CC_STAT_TOP]
        w = stats[i, cv2.CC_STAT_WIDTH]
        h = stats[i, cv2.CC_STAT_HEIGHT]
        area = stats[i, cv2.CC_STAT_AREA]
        cx, cy = centroids[i]
        mask = (labels == i).astype(np.uint8) * 255
        vehicles.append({
            'id': None, 'bbox': (x, y, w, h), 'centroid': (cx, cy), 'area': area, 'mask': mask
        })
    return vehicles

# IOU 계산 함수
def calculate_iou(mask1, mask2):
    intersection = np.logical_and(mask1, mask2).sum()
    union = np.logical_or(mask1, mask2).sum()
    return intersection / union if union != 0 else 0

# Scene-Centric 트래킹 클래스
class SceneCentricVehicleTracker:
    def __init__(self, iou_threshold=0.3, distance_threshold=100):
        self.next_id = 0
        self.tracked_vehicles = {}
        self.disappeared = {}
        self.max_disappeared = 7  # ID 유지 7프레임 고정
        self.iou_threshold = iou_threshold
        self.distance_threshold = distance_threshold
        self.trajectories = defaultdict(list)
        self.colors = {}
        self.kalman_filters = {}
        self.tracking_memory = defaultdict(list)

    def _init_kalman(self, centroid):
        kf = KalmanFilter(dim_x=4, dim_z=2)
        kf.x = np.array([centroid[0], centroid[1], 0, 0])
        kf.F = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]])
        kf.H = np.array([[1, 0, 0, 0], [0, 1, 0, 0]])
        kf.P *= 1000.
        kf.R = np.eye(2) * 5
        kf.Q = np.eye(4) * 0.1
        return kf

    def _assign_color(self, vehicle_id):
        if vehicle_id not in self.colors:
            self.colors[vehicle_id] = (
                random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)
            )
        return self.colors[vehicle_id]

    def _register(self, vehicle, frame_idx):
        vehicle['id'] = self.next_id
        self.tracked_vehicles[self.next_id] = vehicle
        self.disappeared[self.next_id] = 0
        self.trajectories[self.next_id].append(vehicle['centroid'])
        self.kalman_filters[self.next_id] = self._init_kalman(vehicle['centroid'])
        self._assign_color(self.next_id)
        self.tracking_memory[self.next_id].append({
            'frame': frame_idx,
            'centroid': vehicle['centroid'],
            'area': vehicle['area']
        })
        self.next_id += 1

    def update(self, vehicles, frame_idx):
        if len(vehicles) == 0:
            for vehicle_id in list(self.disappeared.keys()):
                self.disappeared[vehicle_id] += 1
                if self.disappeared[vehicle_id] > self.max_disappeared:
                    del self.tracked_vehicles[vehicle_id]
                    del self.disappeared[vehicle_id]
                    del self.kalman_filters[vehicle_id]
            return self.tracked_vehicles

        for vehicle_id in self.tracked_vehicles.keys():
            kf = self.kalman_filters[vehicle_id]
            kf.predict()
            predicted_centroid = (kf.x[0], kf.x[1])
            self.tracked_vehicles[vehicle_id]['predicted_centroid'] = predicted_centroid

        if len(self.tracked_vehicles) == 0:
            for vehicle in vehicles:
                self._register(vehicle, frame_idx)
        else:
            self._match_vehicles(vehicles, frame_idx)

        return self.tracked_vehicles

    def _match_vehicles(self, vehicles, frame_idx):
        tracked_ids = list(self.tracked_vehicles.keys())
        match_matrix = np.zeros((len(tracked_ids), len(vehicles)))

        for i, tracked_id in enumerate(tracked_ids):
            tracked_vehicle = self.tracked_vehicles[tracked_id]
            tracked_mask = tracked_vehicle['mask']
            tracked_centroid = tracked_vehicle.get('predicted_centroid', tracked_vehicle['centroid'])

            for j, vehicle in enumerate(vehicles):
                iou = calculate_iou(tracked_mask > 0, vehicle['mask'] > 0)
                dist = distance.euclidean(tracked_centroid, vehicle['centroid'])
                if iou > self.iou_threshold or dist < self.distance_threshold:
                    score = iou + (1.0 - min(1.0, dist / self.distance_threshold))
                    match_matrix[i, j] = score

        from scipy.optimize import linear_sum_assignment
        tracked_indices, vehicle_indices = linear_sum_assignment(-match_matrix)

        unmatched_tracked = set(range(len(tracked_ids))) - set(tracked_indices)
        unmatched_vehicles = set(range(len(vehicles))) - set(vehicle_indices)

        for tracked_idx, vehicle_idx in zip(tracked_indices, vehicle_indices):
            if match_matrix[tracked_idx, vehicle_idx] == 0:
                unmatched_tracked.add(tracked_idx)
                unmatched_vehicles.add(vehicle_idx)
                continue
            tracked_id = tracked_ids[tracked_idx]
            self.tracked_vehicles[tracked_id] = vehicles[vehicle_idx]
            self.tracked_vehicles[tracked_id]['id'] = tracked_id
            self.disappeared[tracked_id] = 0
            self.trajectories[tracked_id].append(vehicles[vehicle_idx]['centroid'])
            kf = self.kalman_filters[tracked_id]
            kf.update(np.array([vehicles[vehicle_idx]['centroid'][0], vehicles[vehicle_idx]['centroid'][1]]))
            self.tracking_memory[tracked_id].append({
                'frame': frame_idx,
                'centroid': vehicles[vehicle_idx]['centroid'],
                'area': vehicles[vehicle_idx]['area']
            })

        for tracked_idx in unmatched_tracked:
            tracked_id = tracked_ids[tracked_idx]
            self.disappeared[tracked_id] += 1
            if self.disappeared[tracked_id] > self.max_disappeared:
                del self.tracked_vehicles[tracked_id]
                del self.disappeared[tracked_id]
                del self.kalman_filters[tracked_id]

        for vehicle_idx in unmatched_vehicles:
            self._register(vehicles[vehicle_idx], frame_idx)

    def visualize(self, frame):
        vis_frame = frame.copy()
        for vehicle_id, vehicle in self.tracked_vehicles.items():
            color = self._assign_color(vehicle_id)
            x, y, w, h = vehicle['bbox']
            cv2.rectangle(vis_frame, (x, y), (x+w, y+h), color, 2)
            cv2.putText(vis_frame, f"ID: {vehicle_id}", (x, y-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
            cx, cy = vehicle['centroid']
            cv2.circle(vis_frame, (int(cx), int(cy)), 5, color, -1)
            trajectory = self.trajectories[vehicle_id]
            if len(trajectory) > 1:
                for i in range(1, len(trajectory)):
                    pt1 = (int(trajectory[i-1][0]), int(trajectory[i-1][1]))
                    pt2 = (int(trajectory[i][0]), int(trajectory[i][1]))
                    cv2.line(vis_frame, pt1, pt2, color, 2)
        return vis_frame

    def validate_tracking(self):
        for vehicle_id, memory in self.tracking_memory.items():
            if len(memory) > 1:
                frames = [entry['frame'] for entry in memory]
                duration = max(frames) - min(frames) + 1
                print(f"ID {vehicle_id}: 트래킹 기간 {duration} 프레임, 총 기록 {len(memory)}")

# 전체 파이프라인
def process_video(video_path, output_path, model, image_processor, device):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: 비디오 파일을 열 수 없습니다.")
        return

    # 초기 ROI 설정
    ret, first_frame = cap.read()
    if ret:
        road_mask, vehicle_mask, _ = segment_frame(model, image_processor, first_frame, device)
        initial_roi = create_initial_roi(road_mask, vehicle_mask)
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
    tracker = SceneCentricVehicleTracker()

    prev_frame = None
    last_vehicle_mask = None
    pbar = tqdm(total=frame_count, desc="비디오 처리 중")
    frame_idx = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Scene-Centric: ROI 내 픽셀 변화 감지
        if prev_frame is not None:
            diff_mask = detect_pixel_changes(prev_frame, frame)
            roi_diff = cv2.bitwise_and(diff_mask, initial_roi)
            if np.sum(roi_diff) > 1000:  # 변화 감지 시 세그멘테이션 수행
                _, vehicle_mask, _ = segment_frame(model, image_processor, frame, device)
                last_vehicle_mask = vehicle_mask
            else:
                vehicle_mask = last_vehicle_mask if last_vehicle_mask is not None else np.zeros_like(frame[:,:,0], dtype=np.uint8)
        else:
            _, vehicle_mask, _ = segment_frame(model, image_processor, frame, device)
            last_vehicle_mask = vehicle_mask

        # ROI 내 차량 추출 및 트래킹
        vehicles = extract_vehicle_properties(vehicle_mask)
        tracked_vehicles = tracker.update(vehicles, frame_idx)
        vis_frame = tracker.visualize(frame)

        # ROI 오버레이
        roi_overlay = np.zeros_like(frame)
        roi_overlay[initial_roi > 0] = [255, 0, 0]
        vis_frame = cv2.addWeighted(vis_frame, 1.0, roi_overlay, 0.2, 0)

        cv2.putText(vis_frame, f"Frame: {frame_idx}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        cv2.putText(vis_frame, f"Vehicles: {len(tracked_vehicles)}", (10, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        out.write(vis_frame)
        prev_frame = frame.copy()
        pbar.update(1)
        frame_idx += 1

    # 트래킹 유효성 검증
    tracker.validate_tracking()

    cap.release()
    out.release()
    pbar.close()
    print(f"처리 완료! 결과 저장 경로: {output_path}")
    return output_path

# 비디오 처리 실행
output_path = "output_video.mp4"
result_path = process_video(video_path, output_path, model, image_processor, device)

if IN_COLAB and os.path.exists(output_path):
    files.download(output_path)

비디오 크기: 720x480
FPS: 30.039658051666603
총 프레임 수: 913
영상 길이: 30.39초


비디오 처리 중:   0%|          | 0/913 [00:00<?, ?it/s]

KeyboardInterrupt: 