# 6.2.3 影片分析任務 - 中級練習

本練習專注於影片處理和時序分析，包括物體追蹤、行為分析、場景變化檢測等技術。

## 練習目標
- 掌握影片讀取和處理流程
- 實現多物體追蹤算法
- 學習時序特徵分析
- 實現行為模式識別
- 優化影片處理性能

## 難度等級: ⭐⭐⭐ (中級)

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import os
import sys
import time
from typing import List, Tuple, Optional, Dict
from collections import deque, defaultdict

sys.path.append('../../utils')
from image_utils import load_image, resize_image
from visualization import display_image, display_multiple_images
from performance import time_function

plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

print("✅ 環境設置完成")

## 挑戰1: 實現多物體追蹤系統

In [None]:
class MultiObjectTracker:
    """多物體追蹤器"""
    
    def __init__(self, max_disappeared=30, max_distance=50):
        """初始化追蹤器"""
        self.next_object_id = 0
        self.objects = {}
        self.disappeared = {}
        
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance
        
        # 追蹤歷史
        self.tracking_history = defaultdict(lambda: deque(maxlen=50))
        
        print("✅ 多物體追蹤器初始化完成")
    
    def register(self, centroid):
        """註冊新物體"""
        self.objects[self.next_object_id] = centroid
        self.disappeared[self.next_object_id] = 0
        self.next_object_id += 1
    
    def deregister(self, object_id):
        """註銷物體"""
        del self.objects[object_id]
        del self.disappeared[object_id]
        if object_id in self.tracking_history:
            del self.tracking_history[object_id]
    
    def update(self, detections):
        """更新追蹤器"""
        if len(detections) == 0:
            # 沒有檢測到物體，增加disappeared計數
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1
                
                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)
            
            return self.objects
        
        # 初始化輸入質心數組
        input_centroids = np.array(detections)
        
        if len(self.objects) == 0:
            # 沒有現存物體，註冊所有檢測
            for centroid in input_centroids:
                self.register(centroid)
        else:
            # 計算現有物體質心和輸入質心之間的距離
            object_centroids = list(self.objects.values())
            object_ids = list(self.objects.keys())
            
            # 計算距離矩陣
            D = np.linalg.norm(np.array(object_centroids)[:, np.newaxis] - input_centroids, axis=2)
            
            # 找到最小距離的匹配
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]
            
            used_row_indices = set()
            used_col_indices = set()
            
            # 更新現有物體位置
            for (row, col) in zip(rows, cols):
                if row in used_row_indices or col in used_col_indices:
                    continue
                
                if D[row, col] <= self.max_distance:
                    object_id = object_ids[row]
                    self.objects[object_id] = input_centroids[col]
                    self.disappeared[object_id] = 0
                    
                    # 記錄追蹤歷史
                    self.tracking_history[object_id].append(tuple(input_centroids[col]))
                    
                    used_row_indices.add(row)
                    used_col_indices.add(col)
            
            # 處理未匹配的檢測和物體
            unused_rows = set(range(0, D.shape[0])).difference(used_row_indices)
            unused_cols = set(range(0, D.shape[1])).difference(used_col_indices)
            
            # 增加未匹配物體的disappeared計數
            if D.shape[0] >= D.shape[1]:
                for row in unused_rows:
                    object_id = object_ids[row]
                    self.disappeared[object_id] += 1
                    
                    if self.disappeared[object_id] > self.max_disappeared:
                        self.deregister(object_id)
            
            # 註冊新物體
            else:
                for col in unused_cols:
                    self.register(input_centroids[col])
        
        return self.objects
    
    def get_tracking_stats(self):
        """獲取追蹤統計"""
        stats = {
            'total_objects': len(self.objects),
            'active_tracks': len([obj_id for obj_id, disappeared in self.disappeared.items() if disappeared == 0]),
            'track_histories': {obj_id: len(history) for obj_id, history in self.tracking_history.items()}
        }
        return stats

# 創建追蹤器
tracker = MultiObjectTracker()
print("✅ 多物體追蹤器初始化完成")

## 挑戰2: 場景變化檢測

In [None]:
class SceneChangeDetector:
    """場景變化檢測器"""
    
    def __init__(self, threshold=0.3, history_length=10):
        """初始化場景變化檢測器"""
        self.threshold = threshold
        self.history_length = history_length
        self.frame_history = deque(maxlen=history_length)
        self.change_history = deque(maxlen=100)
        
    def calculate_frame_difference(self, frame1, frame2):
        """計算幀間差異"""
        # 轉換為灰階
        gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) if len(frame1.shape) == 3 else frame1
        gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) if len(frame2.shape) == 3 else frame2
        
        # 計算結構相似性指數 (SSIM)
        # 簡化版本：使用平均絕對差值
        diff = cv2.absdiff(gray1, gray2)
        mean_diff = np.mean(diff) / 255.0
        
        return mean_diff
    
    def detect_scene_change(self, current_frame):
        """檢測場景變化"""
        if len(self.frame_history) == 0:
            self.frame_history.append(current_frame)
            return False, 0.0
        
        # 與最近幾幀比較
        max_diff = 0.0
        for prev_frame in list(self.frame_history)[-3:]:  # 比較最近3幀
            diff = self.calculate_frame_difference(current_frame, prev_frame)
            max_diff = max(max_diff, diff)
        
        # 記錄變化歷史
        self.change_history.append(max_diff)
        
        # 更新幀歷史
        self.frame_history.append(current_frame)
        
        # 判斷是否為場景變化
        scene_changed = max_diff > self.threshold
        
        return scene_changed, max_diff
    
    def analyze_temporal_patterns(self):
        """分析時序模式"""
        if len(self.change_history) < 10:
            return None
        
        changes = list(self.change_history)
        
        analysis = {
            'mean_change': np.mean(changes),
            'std_change': np.std(changes),
            'max_change': np.max(changes),
            'scene_changes': sum(1 for change in changes if change > self.threshold),
            'stability': 1.0 - (np.std(changes) / (np.mean(changes) + 1e-6))
        }
        
        return analysis

# 創建場景變化檢測器
scene_detector = SceneChangeDetector()
print("✅ 場景變化檢測器初始化完成")

## 挑戰3: 行為分析系統

In [None]:
class BehaviorAnalyzer:
    """行為分析器"""
    
    def __init__(self):
        """初始化行為分析器"""
        self.behavior_patterns = {
            'stationary': [],     # 靜止行為
            'walking': [],        # 行走行為
            'running': [],        # 跑步行為
            'loitering': [],      # 徘徊行為
            'suspicious': []      # 可疑行為
        }
        
        self.speed_thresholds = {
            'stationary': (0, 2),
            'walking': (2, 15),
            'running': (15, 50),
            'vehicle': (50, 200)
        }
    
    def analyze_motion_pattern(self, track_history):
        """分析運動模式"""
        if len(track_history) < 5:
            return 'unknown'
        
        # 計算速度變化
        speeds = []
        for i in range(1, len(track_history)):
            p1 = track_history[i-1]
            p2 = track_history[i]
            speed = np.linalg.norm(np.array(p2) - np.array(p1))
            speeds.append(speed)
        
        avg_speed = np.mean(speeds)
        
        # 根據平均速度分類行為
        for behavior, (min_speed, max_speed) in self.speed_thresholds.items():
            if min_speed <= avg_speed < max_speed:
                return behavior
        
        return 'unknown'
    
    def detect_loitering(self, track_history, time_threshold=30):
        """檢測徘徊行為"""
        if len(track_history) < time_threshold:
            return False
        
        # 檢查最近N個位置的變化範圍
        recent_positions = track_history[-time_threshold:]
        positions = np.array(recent_positions)
        
        # 計算位置變化範圍
        x_range = np.max(positions[:, 0]) - np.min(positions[:, 0])
        y_range = np.max(positions[:, 1]) - np.min(positions[:, 1])
        
        # 如果移動範圍很小，認為是徘徊
        movement_range = max(x_range, y_range)
        
        return movement_range < 50  # 50像素範圍內認為是徘徊
    
    def analyze_trajectory(self, track_history):
        """分析軌跡特性"""
        if len(track_history) < 3:
            return {}
        
        positions = np.array(track_history)
        
        # 計算軌跡長度
        total_distance = 0
        for i in range(1, len(positions)):
            total_distance += np.linalg.norm(positions[i] - positions[i-1])
        
        # 計算直線距離
        straight_distance = np.linalg.norm(positions[-1] - positions[0])
        
        # 計算軌跡複雜度
        complexity = total_distance / (straight_distance + 1e-6)
        
        # 計算主要運動方向
        direction_vector = positions[-1] - positions[0]
        direction_angle = np.arctan2(direction_vector[1], direction_vector[0]) * 180 / np.pi
        
        return {
            'total_distance': total_distance,
            'straight_distance': straight_distance,
            'complexity': complexity,
            'direction_angle': direction_angle,
            'duration': len(track_history)
        }

# 創建行為分析器
behavior_analyzer = BehaviorAnalyzer()
print("✅ 行為分析器初始化完成")

## 練習任務: 影片分析演示

In [None]:
def create_synthetic_video():
    """創建合成測試影片"""
    print("🎬 創建合成測試影片...")
    
    frames = []
    num_frames = 50
    
    # 創建移動物體
    for i in range(num_frames):
        frame = np.ones((400, 600, 3), dtype=np.uint8) * 100
        
        # 物體1: 水平移動
        x1 = int(50 + i * 10)
        cv2.circle(frame, (x1, 150), 20, (0, 255, 0), -1)
        
        # 物體2: 圓形軌跡
        angle = i * 0.3
        x2 = int(300 + 80 * np.cos(angle))
        y2 = int(200 + 80 * np.sin(angle))
        cv2.rectangle(frame, (x2-15, y2-15), (x2+15, y2+15), (255, 0, 0), -1)
        
        # 物體3: 隨機移動（模擬徘徊）
        if i > 0:
            x3 = int(100 + 20 * np.sin(i * 0.1) + np.random.randint(-5, 6))
            y3 = int(300 + 20 * np.cos(i * 0.1) + np.random.randint(-5, 6))
        else:
            x3, y3 = 100, 300
            
        cv2.circle(frame, (x3, y3), 15, (0, 0, 255), -1)
        
        # 添加幀編號
        cv2.putText(frame, f'Frame {i}', (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        
        frames.append(frame)
    
    print(f"✅ 創建了 {len(frames)} 幀合成影片")
    return frames

# 創建測試影片
test_frames = create_synthetic_video()

# 顯示前幾幀
sample_frames = test_frames[::10]  # 每10幀取一個
sample_titles = [f"幀 {i*10}" for i in range(len(sample_frames))]

display_multiple_images(sample_frames, sample_titles, figsize=(15, 8))

In [None]:
def process_video_frames(frames):
    """處理影片幀序列"""
    print("🔄 開始處理影片幀序列...")
    
    # 背景減除器
    backSub = cv2.createBackgroundSubtractorMOG2()
    
    tracking_results = []
    scene_changes = []
    
    for frame_idx, frame in enumerate(frames):
        # 場景變化檢測
        scene_changed, change_score = scene_detector.detect_scene_change(frame)
        scene_changes.append((frame_idx, scene_changed, change_score))
        
        # 動作檢測
        fg_mask = backSub.apply(frame)
        
        # 查找輪廓
        contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        
        # 提取質心
        centroids = []
        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 200:  # 過濾小區域
                M = cv2.moments(contour)
                if M["m00"] != 0:
                    cx = int(M["m10"] / M["m00"])
                    cy = int(M["m01"] / M["m00"])
                    centroids.append((cx, cy))
        
        # 更新追蹤器
        objects = tracker.update(centroids)
        
        # 記錄追蹤結果
        frame_result = {
            'frame_idx': frame_idx,
            'objects': dict(objects),
            'scene_changed': scene_changed,
            'change_score': change_score
        }
        tracking_results.append(frame_result)
        
        if frame_idx % 10 == 0:
            print(f"  處理幀 {frame_idx}: {len(objects)} 個物體")
    
    print("✅ 影片處理完成")
    return tracking_results, scene_changes

# 處理測試影片
tracking_results, scene_changes = process_video_frames(test_frames)

## 挑戰4: 結果分析和可視化

In [None]:
def analyze_tracking_results(tracking_results):
    """分析追蹤結果"""
    print("📊 分析追蹤結果...")
    
    # 統計物體數量變化
    object_counts = [len(result['objects']) for result in tracking_results]
    
    # 分析每個物體的行為
    object_behaviors = {}
    
    for obj_id, history in tracker.tracking_history.items():
        if len(history) >= 5:
            behavior = behavior_analyzer.analyze_motion_pattern(list(history))
            trajectory_analysis = behavior_analyzer.analyze_trajectory(list(history))
            is_loitering = behavior_analyzer.detect_loitering(list(history))
            
            object_behaviors[obj_id] = {
                'behavior': behavior,
                'trajectory': trajectory_analysis,
                'is_loitering': is_loitering,
                'track_length': len(history)
            }
    
    # 場景分析
    temporal_analysis = scene_detector.analyze_temporal_patterns()
    
    return {
        'object_counts': object_counts,
        'object_behaviors': object_behaviors,
        'temporal_analysis': temporal_analysis,
        'total_frames': len(tracking_results)
    }

# 分析結果
analysis = analyze_tracking_results(tracking_results)

print("📈 追蹤分析結果:")
print(f"  總幀數: {analysis['total_frames']}")
print(f"  最大物體數: {max(analysis['object_counts'])}")
print(f"  平均物體數: {np.mean(analysis['object_counts']):.1f}")

print("\n🎯 物體行為分析:")
for obj_id, behavior_data in analysis['object_behaviors'].items():
    print(f"  物體{obj_id}: {behavior_data['behavior']} "
          f"(軌跡長度: {behavior_data['track_length']}, "
          f"徘徊: {'是' if behavior_data['is_loitering'] else '否'})")
    
    if behavior_data['trajectory']:
        traj = behavior_data['trajectory']
        print(f"    總距離: {traj['total_distance']:.1f}, "
              f"複雜度: {traj['complexity']:.1f}, "
              f"方向: {traj['direction_angle']:.1f}°")

if analysis['temporal_analysis']:
    temp_analysis = analysis['temporal_analysis']
    print("\n📊 時序分析:")
    print(f"  場景變化次數: {temp_analysis['scene_changes']}")
    print(f"  平均變化程度: {temp_analysis['mean_change']:.3f}")
    print(f"  影片穩定性: {temp_analysis['stability']:.3f}")

In [None]:
# 創建追蹤可視化
def visualize_tracking_results(frames, tracking_results):
    """可視化追蹤結果"""
    print("🎨 創建追蹤可視化...")
    
    # 選擇關鍵幀進行可視化
    key_frames = [0, 15, 30, 45]
    
    visualizations = []
    titles = []
    
    for frame_idx in key_frames:
        if frame_idx < len(frames):
            frame = frames[frame_idx].copy()
            result = tracking_results[frame_idx]
            
            # 繪製檢測到的物體
            for obj_id, (cx, cy) in result['objects'].items():
                # 繪製物體中心
                cv2.circle(frame, (int(cx), int(cy)), 10, (0, 255, 255), -1)
                
                # 顯示物體ID
                cv2.putText(frame, str(obj_id), (int(cx)+15, int(cy)), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
                
                # 繪製軌跡（如果有歷史）
                if obj_id in tracker.tracking_history:
                    history = list(tracker.tracking_history[obj_id])
                    if len(history) > 1:
                        pts = np.array(history, np.int32)
                        cv2.polylines(frame, [pts], False, (255, 0, 0), 2)
            
            # 添加場景變化指示
            if result['scene_changed']:
                cv2.putText(frame, "Scene Change!", (10, 60), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
            
            visualizations.append(frame)
            titles.append(f"幀 {frame_idx}\n物體數: {len(result['objects'])}")
    
    display_multiple_images(visualizations, titles, figsize=(16, 8))

# 創建可視化
visualize_tracking_results(test_frames, tracking_results)

# 創建統計圖表
plt.figure(figsize=(15, 10))

# 子圖1: 物體數量隨時間變化
plt.subplot(2, 2, 1)
frame_indices = range(len(analysis['object_counts']))
plt.plot(frame_indices, analysis['object_counts'], 'b-', linewidth=2)
plt.title('物體數量隨時間變化')
plt.xlabel('幀編號')
plt.ylabel('物體數量')
plt.grid(True)

# 子圖2: 場景變化強度
plt.subplot(2, 2, 2)
change_scores = [score for _, _, score in scene_changes]
plt.plot(frame_indices, change_scores, 'r-', linewidth=2)
plt.axhline(y=scene_detector.threshold, color='orange', linestyle='--', label='變化閾值')
plt.title('場景變化強度')
plt.xlabel('幀編號')
plt.ylabel('變化分數')
plt.legend()
plt.grid(True)

# 子圖3: 軌跡複雜度分布
plt.subplot(2, 2, 3)
complexities = [data['trajectory'].get('complexity', 0) 
               for data in analysis['object_behaviors'].values() 
               if data['trajectory']]

if complexities:
    plt.hist(complexities, bins=10, alpha=0.7, color='green')
    plt.title('軌跡複雜度分布')
    plt.xlabel('複雜度')
    plt.ylabel('頻次')

# 子圖4: 行為類型統計
plt.subplot(2, 2, 4)
behaviors = [data['behavior'] for data in analysis['object_behaviors'].values()]
behavior_counts = {}
for behavior in behaviors:
    behavior_counts[behavior] = behavior_counts.get(behavior, 0) + 1

if behavior_counts:
    plt.bar(behavior_counts.keys(), behavior_counts.values(), color='purple')
    plt.title('行為類型統計')
    plt.ylabel('物體數量')
    plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

print("✅ 統計圖表已生成")

## 總結與評估

### 🎯 練習完成檢核
- [ ] 實現了多物體追蹤系統
- [ ] 完成了場景變化檢測
- [ ] 建立了行為分析框架
- [ ] 創建了完整的可視化系統
- [ ] 理解了時序分析的重要性

### 📊 技術收穫
1. **追蹤算法**: 理解質心追蹤、匈牙利算法、卡爾曼濾波
2. **行為分析**: 掌握運動模式識別、軌跡分析、異常檢測
3. **時序處理**: 學習影片數據的時間維度特性
4. **系統整合**: 將多種技術組合成完整系統

### 🚀 應用場景
- 智能監控系統
- 交通流量分析
- 體育比賽分析
- 動物行為研究
- 工業流程監控

### 📈 性能要求
- 實時處理: >20 FPS
- 追蹤準確率: >90%
- 記憶體使用: <500MB
- CPU使用率: <80%