<a href="https://colab.research.google.com/github/JSKimGitHub/testfile/blob/main/Untitled33.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==== PyTorch 2.4.1 + CUDA 12.1 설치 ====
!pip install -q --upgrade --force-reinstall \
  torch==2.4.1 torchvision==0.19.1 torchaudio==2.4.1 \
  --index-url https://download.pytorch.org/whl/cu121

# ==== 충돌 패키지 정렬 ====
!pip install -q \
  "numpy==2.0.2" \
  "fsspec==2025.3.0" \
  "typing_extensions>=4.14.0" \
  "filelock>=3.15" \
  "jedi"

# (선택) 자주 쓰는 것들
!pip install -q opencv-python tqdm


[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ipython 7.34.0 requires jedi>=0.16, which is not installed.
gcsfs 2025.3.0 requires fsspec==2025.3.0, but you have fsspec 2024.6.1 which is incompatible.
typeguard 4.4.4 requires typing_extensions>=4.14.0, but you have typing-extensions 4.12.2 which is incompatible.
numba 0.60.0 requires numpy<2.1,>=1.22, but you have numpy 2.1.2 which is incompatible.
pytensor 2.31.7 requires filelock>=3.15, but you have filelock 3.13.1 which is incompatible.[0m[31m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.2/19.2 MB[0m [31m118.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m193.6/193.6 kB[0m [31m21.8 MB/s[0m eta [36m0:00:00[0m
[

In [None]:
#!/usr/bin/env python3
"""
Jetson Optimized Road Segmentation and Autonomous Driving System
Colab-friendly version (no argparse; run via run_colab(...))
"""

import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
from IPython.display import display, HTML
from collections import deque

# gym/gymnasium이 없어도 동작하도록 안전한 fallback 유지
try:
    import gymnasium as gym
    from gymnasium import spaces
except Exception:
    try:
        import gym  # fallback to classic gym
        from gym import spaces
    except Exception:
        # Minimal fallback for environments without gym
        class _MiniEnv:
            pass
        class _MiniDiscrete:
                def __init__(self, n): self.n = n
                def sample(self): return int(np.random.randint(self.n))
        class _MiniBox:
            def __init__(self, low, high, shape, dtype=np.float32):
                self.low, self.high, self.shape, self.dtype = low, high, shape, dtype
        class _MiniSpaces:
            Discrete = _MiniDiscrete
            Box = _MiniBox
        class _MiniGym:
            Env = object
        gym, spaces = _MiniGym(), _MiniSpaces()

import time
import os
import gc

# Colab엔 TensorRT/PyCUDA가 없으므로, 선택적 import 후 안전하게 CPU fallback
try:
    import tensorrt as trt
    import pycuda.driver as cuda
    import pycuda.autoinit
    JETSON_AVAILABLE = True
except Exception:
    print("Info: TensorRT/CUDA not available here. Running in CPU/Colab mode.")
    JETSON_AVAILABLE = False

# GPU 사용 가능 여부
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU mode enabled")
else:
    device = torch.device("cpu")
    print("CPU mode enabled")

# 메모리 관리 (psutil이 없을 수도 있어 예외 처리)
try:
    import psutil
    _HAS_PSUTIL = True
except Exception:
    _HAS_PSUTIL = False

class JetsonMemoryManager:
    """(Colab 호환) 메모리 관리: GPU 없이도 안전하게 동작"""
    def __init__(self):
        self.max_gpu_memory = 0.8  # 80%

    def check_memory(self):
        # Jetson이 아니면 GPU 체크 skip
        if JETSON_AVAILABLE:
            try:
                gpu_memory = cuda.mem_get_info()
                available_memory = gpu_memory[0] / (1024**3)  # GB
                total_memory = gpu_memory[1] / (1024**3)      # GB
                usage_percent = (total_memory - available_memory) / total_memory
                if usage_percent > self.max_gpu_memory:
                    print(f"Warning: GPU memory usage high: {usage_percent:.2%}")
                    self.cleanup_memory()
            except Exception:
                pass

        if _HAS_PSUTIL:
            try:
                cpu_memory = psutil.virtual_memory()
                if cpu_memory.percent > 80:
                    print(f"Warning: CPU memory usage high: {cpu_memory.percent:.1f}%")
                    self.cleanup_memory()
            except Exception:
                pass

    def cleanup_memory(self):
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

class JetsonOptimizedLaneDetector:
    """Jetson 최적화 차선 감지기 (Colab에서도 동작)"""
    def __init__(self, memory_manager):
        self.memory_manager = memory_manager
        self.prev_lane_state = "center"
        self.change_counter = 0
        self.threshold_frames = 5
        self.margin = 50
        self.vanish_history = []
        self.lane_width_history = []
        self.left_detect = False
        self.right_detect = False
        self.left_m = self.right_m = 0
        self.left_b = self.right_b = (0, 0)
        self.prev_lanes = [None, None]
        self.frame_skip = 1
        self.frame_count = 0
        self.img_center = None

    def filter_colors(self, image):
        height, width = image.shape[:2]
        if width > 640:
            scale = 640 / width
            image = cv2.resize(image, (int(width*scale), int(height*scale)))
        hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
        lower_white, upper_white = np.array([0,0,180]), np.array([180,30,255])
        white_mask = cv2.inRange(hsv, lower_white, upper_white)
        lower_yellow, upper_yellow = np.array([18,50,50]), np.array([30,255,255])
        yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
        return cv2.bitwise_or(white_mask, yellow_mask)

    def limit_region(self, image):
        height, width = image.shape[:2]
        mask = np.zeros_like(image)
        polygon = np.array([
            (int(0.05*width), height),
            (int(0.35*width), int(0.62*height)),
            (int(0.65*width), int(0.62*height)),
            (int(0.95*width), height),
            ], dtype=np.int32)
        cv2.fillPoly(mask, [polygon], 255)
        return cv2.bitwise_and(image, mask)

    def houghLines(self, image):
        return cv2.HoughLinesP(image, 1, np.pi/180, 40, minLineLength=30, maxLineGap=120)

    def separateLine(self, image, lines):
        left, right = [], []
        height, width = image.shape[:2]
        self.img_center = width // 2
        slope_thresh = 0.3
        if lines is not None:
            for line in lines:
                x1, y1, x2, y2 = line[0]
                if x2 - x1 == 0: continue
                slope = (y2 - y1) / (x2 - x1)
                if abs(slope) < slope_thresh: continue
                (left if slope < 0 else right).append(line[0])
        return left, right

    def regression(self, separated, image):
        left, right = separated
        height = image.shape[0]
        lanes = []

        # right
        if len(right) > 0:
            rx = [l[0] for l in right] + [l[2] for l in right]
            ry = [l[1] for l in right] + [l[3] for l in right]
            if len(rx) > 1:
                m, b = np.polyfit(ry, rx, 1)
                start = (int(m*height + b), height)
                end = (int(m*(height*0.6) + b), int(height*0.6))
                lanes.append((start, end))
                self.right_m, self.right_b, self.right_detect = m, b, True
            else:
                lanes.append(self.prev_lanes[0])
        else:
            lanes.append(self.prev_lanes[0])

        # left
        if len(left) > 0:
            lx = [l[0] for l in left] + [l[2] for l in left]
            ly = [l[1] for l in left] + [l[3] for l in left]
            if len(lx) > 1:
                m, b = np.polyfit(ly, lx, 1)
                start = (int(m*height + b), height)
                end = (int(m*(height*0.6) + b), int(height*0.6))
                lanes.append((start, end))
                self.left_m, self.left_b, self.left_detect = m, b, True
            else:
                lanes.append(self.prev_lanes[1])
        else:
            lanes.append(self.prev_lanes[1])

        self.prev_lanes = lanes
        return lanes

    def detect_lane_change(self, lanes):
        if len(lanes) < 2 or self.img_center is None:
            return "center"
        left_lane, right_lane = lanes
        if left_lane is None or right_lane is None:
            return "center"

        left_center = (left_lane[0][0] + left_lane[1][0]) // 2
        right_center = (right_lane[0][0] + right_lane[1][0]) // 2
        lane_center = (left_center + right_center) // 2

        if abs(lane_center - self.img_center) < self.margin:
            current = "center"
        elif lane_center < self.img_center:
            current = "left"
        else:
            current = "right"

        if current != self.prev_lane_state:
            self.change_counter += 1
            if self.change_counter >= self.threshold_frames:
                self.prev_lane_state = current
                return f"change_to_{current}"
        else:
            self.change_counter = 0
        return current

    def process_frame(self, frame):
        self.frame_count += 1
        if self.frame_count % self.frame_skip != 0:
            return self.prev_lanes, self.prev_lane_state
        self.memory_manager.check_memory()
        filtered = self.filter_colors(frame)
        roi = self.limit_region(filtered)
        edges = cv2.Canny(roi, 50, 150)
        lines = self.houghLines(edges)
        separated = self.separateLine(edges, lines)
        lanes = self.regression(separated, frame)
        lane_state = self.detect_lane_change(lanes)
        return lanes, lane_state

class JetsonOptimizedDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 64), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(64, 64), nn.ReLU(), nn.Dropout(0.2),
            nn.Linear(64, action_dim)
        )
        if torch.cuda.is_available(): self.cuda()
    def forward(self, x): return self.fc(x)

class SimSiamEncoder(nn.Module):
    """경량 SimSiam 스타일 SSL 인코더"""
    def __init__(self, feature_dim=128):
        super().__init__()
        self.backbone = nn.Sequential(
            nn.Conv2d(3,32,3,2,1), nn.BatchNorm2d(32), nn.ReLU(inplace=True),
            nn.Conv2d(32,64,3,2,1), nn.BatchNorm2d(64), nn.ReLU(inplace=True),
            nn.Conv2d(64,128,3,2,1), nn.BatchNorm2d(128), nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1,1))
        )
        self.projector = nn.Sequential(
            nn.Linear(128,256), nn.BatchNorm1d(256), nn.ReLU(inplace=True),
            nn.Linear(256, feature_dim)
        )
        self.predictor = nn.Sequential(
            nn.Linear(feature_dim,128), nn.BatchNorm1d(128), nn.ReLU(inplace=True),
            nn.Linear(128, feature_dim)
        )
        if torch.cuda.is_available(): self.cuda()

    def encode(self, x):
        z = self.backbone(x)
        z = z.view(z.size(0), -1)
        return self.projector(z)

    def forward(self, x): return self.encode(x)

def _negative_cosine_similarity(p, z):
    z = z.detach()
    p = F.normalize(p, dim=1)
    z = F.normalize(z, dim=1)
    return -(p * z).sum(dim=1).mean()

class SSLPretrainer:
    def __init__(self, input_size=(128,128), feature_dim=128, lr=1e-3):
        self.input_w, self.input_h = input_size
        self.model = SimSiamEncoder(feature_dim=feature_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    def _augment(self, imgs):
        batch = []
        for img in imgs:
            h, w = img.shape[:2]
            crop_scale = random.uniform(0.8, 1.0)
            ch, cw = int(h*crop_scale), int(w*crop_scale)
            y0 = random.randint(0, max(0, h-ch))
            x0 = random.randint(0, max(0, w-cw))
            cropped = img[y0:y0+ch, x0:x0+cw]
            resized = cv2.resize(cropped, (self.input_w, self.input_h))
            alpha = random.uniform(0.8, 1.2)  # contrast
            beta = random.uniform(-20, 20)    # brightness
            jitter = cv2.convertScaleAbs(resized, alpha=alpha, beta=beta)
            if random.random() < 0.3:
                k = random.choice([3,5])
                jitter = cv2.GaussianBlur(jitter, (k,k), 0)
            batch.append(jitter)
        batch = np.stack(batch, axis=0)
        batch = torch.from_numpy(batch).permute(0,3,1,2).float()/255.0
        return batch.to(self.device)

    def train_epoch(self, frames, batch_size=32):
        self.model.train()
        num_frames = len(frames)
        if num_frames == 0: return 0.0
        indices = np.random.permutation(num_frames)
        total_loss, num_batches = 0.0, 0
        for start in range(0, num_frames, batch_size):
            end = min(start+batch_size, num_frames)
            idx = indices[start:end]
            imgs = [frames[i] for i in idx]
            v1, v2 = self._augment(imgs), self._augment(imgs)
            use_amp = (self.device.type == 'cuda')
            with torch.cuda.amp.autocast(enabled=use_amp):
                z1, z2 = self.model.encode(v1), self.model.encode(v2)
                p1, p2 = self.model.predictor(z1), self.model.predictor(z2)
                loss = _negative_cosine_similarity(p1, z2)/2.0 + _negative_cosine_similarity(p2, z1)/2.0
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            total_loss += loss.item(); num_batches += 1
        return total_loss / max(1, num_batches)

    def extract_features(self, frames, batch_size=64):
        self.model.eval()
        feats = []
        with torch.no_grad():
            for start in range(0, len(frames), batch_size):
                batch = frames[start:start+batch_size]
                x_np = [cv2.resize(img, (self.input_w, self.input_h)) for img in batch]
                x = torch.from_numpy(np.stack(x_np, axis=0)).permute(0,3,1,2).float().to(self.device)/255.0
                z = self.model.encode(x)
                feats.append(z.cpu().numpy())
        return np.concatenate(feats, axis=0) if len(feats)>0 else []

class JetsonDrivingEnv(gym.Env):
    """간단 주행 환경 (영상 프레임 시퀀스 기반)"""
    def __init__(self, frames, lane_detector, memory_manager, ssl_features=None, use_ssl_features=False):
        super().__init__()
        if len(frames)==0: raise ValueError("frames empty")
        self.frames = frames
        self.current_idx = 0
        self.env_h, self.env_w = frames[0].shape[:2]
        self.car_x = self.env_w // 2
        self.lane_detector = lane_detector
        self.memory_manager = memory_manager
        self.ssl_features = ssl_features
        self.use_ssl_features = use_ssl_features and (ssl_features is not None) and (len(ssl_features)==len(frames))
        self.ssl_feature_dim = int(ssl_features.shape[1]) if (ssl_features is not None and hasattr(ssl_features,'shape')) else 0
        self.action_space = spaces.Discrete(3)  # 0:L, 1:F, 2:R
        obs_dim = 4 + (self.ssl_feature_dim if self.use_ssl_features else 0)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32)
        self.step_count = 0
        self.reward_history = []

    def reset(self, seed=None):
        super().reset(seed=seed)
        self.current_idx = 0
        self.step_count = 0
        return self._get_state(), {}

    def _get_state(self):
        if self.current_idx >= len(self.frames):
            dim = 4 + (self.ssl_feature_dim if self.use_ssl_features else 0)
            return np.zeros(dim, dtype=np.float32)
        frame = self.frames[self.current_idx]
        lanes, lane_state = self.lane_detector.process_frame(frame)
        state = np.zeros(4, dtype=np.float32)
        if lanes and len(lanes) >= 2:
            left_lane, right_lane = lanes
            left_center = (left_lane[0][0] + left_lane[1][0]) / 2 if left_lane is not None else None
            right_center = (right_lane[0][0] + right_lane[1][0]) / 2 if right_lane is not None else None
            state[0] = (left_center / self.env_w) if left_center is not None else 0.0
            state[1] = (right_center / self.env_w) if right_center is not None else 1.0
            if (left_center is not None) and (right_center is not None):
                lane_center = (left_center + right_center) / 2
                state[2] = lane_center / self.env_w
            else:
                state[0] = 0.0
                state[1] = 1.0
                state[2] = 0.5
        state[3] = self.car_x / self.env_w
        if self.use_ssl_features and self.ssl_features is not None:
            feat = self.ssl_features[self.current_idx].astype(np.float32)
            state = np.concatenate([state, feat], axis=0)
        return state

    def step(self, action):
        self.step_count += 1
        if action == 0: self.car_x = max(0, self.car_x - 10)
        elif action == 2: self.car_x = min(self.env_w, self.car_x + 10)
        self.current_idx += 1
        state = self._get_state()
        reward = self._calculate_reward(state, action)
        self.reward_history.append(reward)
        done = self.current_idx >= len(self.frames) - 1
        info = {'step': self.step_count, 'frame_idx': self.current_idx, 'reward': reward}
        return state, reward, done, False, info

    def _calculate_reward(self, state, action):
        lane_center, car_pos = state[2], state[3]
        dist = abs(car_pos - lane_center)
        reward = 2.0 * (0.5 - dist)   # dist=0 -> +1, dist=0.5 -> -0
    # 큰 이탈에만 패널티
        if dist > 0.35:
          reward -= 5.0
        reward += 0.5                 # 작은 생존 보너스
        return reward

class JetsonRoadSegmentationSystem:
    """도로 분할/차선 + 간단 DQN 학습 + (선택) SSL 특징 추출"""
    def __init__(self, video_path=None, output_path="/content/output_jetson.mp4",
                 feature_dim=128, ssl_input_size=(128,128)):
        self.video_path = video_path
        self.output_path = output_path
        self.memory_manager = JetsonMemoryManager()
        self.feature_dim = feature_dim
        self.ssl_input_size = ssl_input_size
        self.fps_counter = 0
        self.fps_start_time = time.time()
        self.processing_times = []
        print("Jetson Road Segmentation System Initialized")
        print(f"TensorRT Available: {JETSON_AVAILABLE}")

    def pretrain_ssl(self, frames, epochs=5, batch_size=32, lr=1e-3):
        print("Starting SSL pretraining (SimSiam)...")
        pretrainer = SSLPretrainer(input_size=self.ssl_input_size, feature_dim=self.feature_dim, lr=lr)
        for ep in range(epochs):
            loss = pretrainer.train_epoch(frames, batch_size=batch_size)
            print(f"SSL Epoch {ep+1}/{epochs} - Loss: {loss:.4f}")
        feats = pretrainer.extract_features(frames, batch_size=64)
        print(f"SSL done. Feature shape: {np.shape(feats)}")
        return feats, pretrainer.model

    def _load_video_file(self, video_path):
        cap = cv2.VideoCapture(video_path)
        file_frames, count = [], 0
        while True:
            ret, frame = cap.read()
            if not ret: break
            h, w = frame.shape[:2]
            if w > 640:
                scale = 640 / w
                frame = cv2.resize(frame, (int(w*scale), int(h*scale)))
            file_frames.append(frame); count += 1
        cap.release()
        if count == 0:
            raise FileNotFoundError(f"Cannot read frames from: {video_path}")
        return file_frames

    def load_video(self, video_path):
        if not os.path.exists(video_path):
            raise FileNotFoundError(f"Video path not found: {video_path}")
        frames = []
        if os.path.isdir(video_path):
            print(f"Loading videos from directory: {video_path}")
            exts = {'.mp4','.avi','.mov','.mkv','.MP4','.AVI','.MOV','.MKV'}
            files = [os.path.join(video_path, f) for f in sorted(os.listdir(video_path)) if os.path.splitext(f)[1] in exts]
            if len(files)==0: print("No video files found in directory.")
            for fp in files:
                print(f"Loading: {os.path.basename(fp)}")
                f = self._load_video_file(fp)
                frames.extend(f)
                print(f"Loaded {len(f)} frames from {os.path.basename(fp)} (total {len(frames)})")
        else:
            print(f"Loading video frames from file: {video_path}")
            frames = self._load_video_file(video_path)
        print(f"Total frames loaded: {len(frames)}")
        return frames

    def process_video(self, frames, policy_net=None, ssl_features=None, use_ssl_features=False):
        print("Processing video...")
        if len(frames) == 0:
            print("No frames to process")
            return

        height, width = frames[0].shape[:2]
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(self.output_path, fourcc, 30.0, (width, height))

        lane_detector = JetsonOptimizedLaneDetector(self.memory_manager)
        car_x = width // 2
        total_frames = len(frames)
        start_time = time.time()
        action_names = {0:'LEFT',1:'FORWARD',2:'RIGHT'}
        fps = 0.0

        for i, frame in enumerate(frames):
            frame_start = time.time()
            lanes, lane_state = lane_detector.process_frame(frame)
            overlay = frame.copy()

            if lanes and len(lanes) >= 2:
                left_lane, right_lane = lanes
                if left_lane is not None:
                    cv2.line(overlay, left_lane[0], left_lane[1], (0,255,0), 3)
                if right_lane is not None:
                    cv2.line(overlay, right_lane[0], right_lane[1], (0,255,0), 3)
                if left_lane is not None and right_lane is not None:
                    lc = ((left_lane[0][0]+left_lane[1][0])//2,(left_lane[0][1]+left_lane[1][1])//2)
                    rc = ((right_lane[0][0]+right_lane[1][0])//2,(right_lane[0][1]+right_lane[1][1])//2)
                    lane_center = ((lc[0]+rc[0])//2,(lc[1]+rc[1])//2)
                    cv2.circle(overlay, lane_center, 5, (0,0,255), -1)

            if policy_net is not None:
                state = np.zeros(4, dtype=np.float32)
                if lanes and len(lanes) >= 2:
                    left_lane, right_lane = lanes
                    left_center = (left_lane[0][0] + left_lane[1][0]) / 2 if left_lane is not None else None
                    right_center = (right_lane[0][0] + right_lane[1][0]) / 2 if right_lane is not None else None
                    state[0] = (left_center/width) if left_center is not None else 0.0
                    state[1] = (right_center/width) if right_center is not None else 1.0
                    state[2] = ((left_center + right_center)/(2*width)) if (left_center is not None and right_center is not None) else 0.5
                state[3] = car_x / width
                if use_ssl_features and ssl_features is not None and i < len(ssl_features):
                    feat = ssl_features[i].astype(np.float32)
                    state = np.concatenate([state, feat], axis=0)
                with torch.no_grad():
                    q = policy_net(torch.tensor(state, dtype=torch.float32).to(device))
                    action = int(q.argmax().item())
                if action == 0: car_x = max(0, car_x-10)
                elif action == 2: car_x = min(width, car_x+10)
                cv2.putText(overlay, f"Action: {action_names.get(action,'?')}", (10,150),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0,200,255), 2)

            cv2.putText(overlay, f"Lane: {lane_state}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)

            self.fps_counter += 1
            if time.time() - self.fps_start_time >= 1.0:
                fps = self.fps_counter / (time.time() - self.fps_start_time)
                self.fps_counter = 0
                self.fps_start_time = time.time()
            cv2.putText(overlay, f"FPS: {fps:.1f}", (10,70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)

            progress = (i+1)/total_frames*100
            cv2.putText(overlay, f"Progress: {progress:.1f}%", (10,110), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)

            self.processing_times.append(time.time() - frame_start)
            out.write(overlay)

            if i % 100 == 0:
                elapsed = time.time() - start_time
                avg_fps = (i+1)/max(elapsed,1e-6)
                print(f"Processed {i+1}/{total_frames} frames, Avg FPS: {avg_fps:.1f}, "
                      f"Avg Proc Time: {np.mean(self.processing_times):.3f}s")

        out.release()
        total_time = time.time() - start_time
        avg_fps = total_frames / max(total_time,1e-6)
        avg_pt = np.mean(self.processing_times)
        print("\nProcessing completed!")
        print(f"Total frames: {total_frames}")
        print(f"Total time: {total_time:.2f}s")
        print(f"Average FPS: {avg_fps:.1f}")
        print(f"Average processing time per frame: {avg_pt:.3f}s")
        print(f"Output saved to: {self.output_path}")

        # Colab에서 바로 미리보기
        try:
            from IPython.display import HTML
            from base64 import b64encode
            mp4 = open(self.output_path,'rb').read()
            data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
            display(HTML(f"""
            <video width="800" controls>
              <source src="{data_url}" type="video/mp4">
            </video>
            """))
        except Exception:
            pass

    def run_single(self, video_file, enable_ssl=False, ssl_epochs=5, dqn_episodes=50):
        frames = self.load_video(video_file)
        ssl_features = None
        if enable_ssl:
            ssl_features, _ = self.pretrain_ssl(frames, epochs=ssl_epochs)

        lane_detector = JetsonOptimizedLaneDetector(self.memory_manager)
        env = JetsonDrivingEnv(frames, lane_detector, self.memory_manager,
                               ssl_features=ssl_features, use_ssl_features=enable_ssl)

        state_dim = 4 + (self.feature_dim if enable_ssl else 0)
        policy_net = JetsonOptimizedDQN(state_dim, 3)
        target_net = JetsonOptimizedDQN(state_dim, 3)
        target_net.load_state_dict(policy_net.state_dict())
        optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
        buffer = deque(maxlen=10000)

        gamma, epsilon, batch_size = 0.99, 0.1, 32
        for episode in range(dqn_episodes):
            state, _ = env.reset()
            total_reward = 0
            while True:
                if random.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    with torch.no_grad():
                        q_values = policy_net(torch.tensor(state, dtype=torch.float32).to(device))
                        action = int(q_values.argmax().item())
                next_state, reward, done, _, _ = env.step(action)
                total_reward += reward
                buffer.append((state, action, reward, next_state, done))
                if len(buffer) >= batch_size:
                    batch = random.sample(buffer, batch_size)
                    states = torch.tensor([e[0] for e in batch], dtype=torch.float32).to(device)
                    actions = torch.tensor([e[1] for e in batch], dtype=torch.long).to(device)
                    rewards = torch.tensor([e[2] for e in batch], dtype=torch.float32).to(device)
                    next_states = torch.tensor([e[3] for e in batch], dtype=torch.float32).to(device)
                    dones = torch.tensor([e[4] for e in batch], dtype=torch.bool).to(device)

                    current_q = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()
                    with torch.no_grad():
                        next_q = target_net(next_states).max(1)[0]
                        target_q = rewards + gamma * (1 - dones.float()) * next_q
                    loss = nn.MSELoss()(current_q, target_q)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                state = next_state
                if done: break
            if episode % 10 == 0:
                print(f"Episode {episode}, Reward: {total_reward:.2f}")
        self.process_video(frames, policy_net, ssl_features=ssl_features, use_ssl_features=enable_ssl)

    # (원본의 연속학습 DQN은 길어 Colab에서는 단일 비디오 실행을 기본으로 제공)
    # 필요하면 연속학습 버전도 호출 가능하도록 run_continual을 그대로 둘 수 있지만,
    # Colab 사용은 보통 단일 비디오 실험에 적합.

def run_colab(video_path,
              output_path="/content/output_jetson.mp4",
              enable_ssl=False,
              ssl_epochs=5,
              dqn_episodes=50,
              feature_dim=128,
              ssl_input_size=(128,128)):
    """
    Colab에서 한 줄로 실행하기 위한 헬퍼
    """
    system = JetsonRoadSegmentationSystem(
        video_path=video_path,
        output_path=output_path,
        feature_dim=feature_dim,
        ssl_input_size=ssl_input_size
    )
    system.run_single(video_path, enable_ssl=enable_ssl, ssl_epochs=ssl_epochs, dqn_episodes=dqn_episodes)

# 사용 예시:
# run_colab("/content/drive/MyDrive/sample.mp4", enable_ssl=False, dqn_episodes=30)


Info: TensorRT/CUDA not available here. Running in CPU/Colab mode.
GPU mode enabled


In [None]:
run_colab("/content/2_1.mp4", enable_ssl=False, dqn_episodes=30)

Jetson Road Segmentation System Initialized
TensorRT Available: False
Loading video frames from file: /content/2_1.mp4
Total frames loaded: 514
Episode 0, Reward: 373.27
Episode 10, Reward: 679.87
Episode 20, Reward: 715.29
Processing video...
Processed 1/514 frames, Avg FPS: 191.0, Avg Proc Time: 0.003s
Processed 101/514 frames, Avg FPS: 217.5, Avg Proc Time: 0.003s
Processed 201/514 frames, Avg FPS: 216.4, Avg Proc Time: 0.003s
Processed 301/514 frames, Avg FPS: 209.2, Avg Proc Time: 0.003s
Processed 401/514 frames, Avg FPS: 204.5, Avg Proc Time: 0.003s
Processed 501/514 frames, Avg FPS: 203.2, Avg Proc Time: 0.003s

Processing completed!
Total frames: 514
Total time: 2.53s
Average FPS: 202.9
Average processing time per frame: 0.003s
Output saved to: /content/output_jetson.mp4
