In [None]:
import cv2
import threading
from tracker import ObjectTracker
from zoom_handler import ZoomHandler

class VideoCaptureThread(threading.Thread):
    def __init__(self, tracker):
        super().__init__()
        self.cap = cv2.VideoCapture(0)  # 開啟webcam
        self.tracker = tracker  # 傳入的追蹤器對象
        self.running = True  # 控制循環的變量
        self.tracking_enabled = False  # 追蹤是否啟用
        self._check_camera()
        self.zoom_handler = ZoomHandler()
        
    def _check_camera(self):
        if not self.cap.isOpened():
            print("Error: Could not open webcam.")  # 錯誤提示
            exit()

    def run(self):
        cv2.namedWindow("Webcam")
        while self.running:
            ret, frame = self.cap.read()  # 讀取一幀影像

            if not ret:
                break

            # 如果啟用了追蹤，調用 tracker 的 update 方法
            frame = self.zoom_handler.apply_zoom(frame)
            if self.tracking_enabled:
                self.tracker.update_tracker(frame)
            
            cv2.imshow("Webcam", frame)  # 顯示影像
            
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q') or key == 27:  # 按下 'q' 或 'esc' 鍵退出
                self.running = False
            if key == ord('t') or key == ord('T'):  # 按下 't' 鍵切換追蹤
                self.tracking_enabled = not self.tracking_enabled
                if self.tracking_enabled:
                    print("Tracking enabled")  # 啟動追蹤
                    cv2.setMouseCallback("Webcam", self.tracker.draw_rectangle)
                else:
                    print("Tracking disabled")  # 停止追蹤
                    self.tracker.stop_tracking()
                    cv2.setMouseCallback("Webcam", lambda *args: None)  # 移除滑鼠回調          
            if key == ord('+'):
                self.zoom_handler.zoom_in()
            if key == ord('-'):
                self.zoom_handler.zoom_out()
            if key == ord('z') or key == ord('Z'):
                self.zoom_handler.zoom_reset()
                
        self.cap.release()
        cv2.destroyAllWindows()

def main():
    tracker = ObjectTracker(roi_size=30, tracker_type='CSRT', debug=True)  # 創建追蹤器對象
    video_thread = VideoCaptureThread(tracker)  # 創建主threading
    video_thread.start()  # 啟動主threading
    video_thread.join()  # 等待主threading結束

if __name__ == "__main__":
    main()


In [2]:
#tracker.py
# --------------------------------------------------------------
# Author: Thomas Huang
# Date: 2024-06-26
# Description: tracker
# --------------------------------------------------------------
import cv2

class ObjectTracker:
    def __init__(self, roi_size=40, tracker_type='CSRT', debug=False):
        self.center = None  # 中心點
        self.drawing = False  # 是否繪製矩形框
        self.tracking = False  # 是否正在追蹤
        self.tracker = None  # 追蹤器
        self.initBB = None  # 初始邊界框
        self.roi_size = roi_size  # ROI大小
        self.tracker_type = tracker_type  # 追蹤器類型
        self.debug = debug  # 調試模式
        self.frame = None  # 當前幀

    def draw_rectangle(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            self.center = (x, y)  # 設定中心點
            self.drawing = True  # 啟用繪製
            self.initBB = (self.center[0] - self.roi_size // 2, self.center[1] - self.roi_size // 2, self.roi_size, self.roi_size)  # 設定ROI的大小
            self.tracker = self._create_tracker()  # 創建追蹤器
            self.tracker.init(self.frame, self.initBB)  # 初始化追蹤器
            self.tracking = True  # 啟用追蹤

    def _create_tracker(self):
        # 根據不同類型創建追蹤器
        if self.tracker_type == 'CSRT':
            return cv2.TrackerCSRT_create()  # 創建CSRT追蹤器
        elif self.tracker_type == 'KCF':
            return cv2.TrackerKCF_create()  # 創建KCF追蹤器
        elif self.tracker_type == 'MIL':
            return cv2.TrackerMIL_create()  # 創建MIL追蹤器
        elif self.tracker_type == 'TLD':
            return cv2.legacy.TrackerTLD_create()  # 創建TLD追蹤器
        elif self.tracker_type == 'MOSSE':
            return cv2.legacy.TrackerMOSSE_create()  # 創建MOSSE追蹤器
        else:
            print(f"Error: Unknown tracker type {self.tracker_type}. Falling back to CSRT.")
            return cv2.TrackerCSRT_create()  # 默認創建CSRT追蹤器

    def update_tracker(self, frame):
        self.frame = frame  # 更新當前幀
        if self.tracking and self.tracker:
            success, box = self.tracker.update(frame)  # 更新追蹤器
            if success:
                (x, y, w, h) = [int(v) for v in box]
                self.center = (x + w // 2, y + h // 2)  # 更新中心點
                top_left = (self.center[0] - self.roi_size // 2, self.center[1] - self.roi_size // 2)
                bottom_right = (self.center[0] + self.roi_size // 2, self.center[1] + self.roi_size // 2)
                cv2.rectangle(frame, top_left, bottom_right, (255, 0, 0), 2)  # 繪製矩形框
            else:
                self.drawing = False  # 停止繪製
                self.tracking = False  # 停止追蹤
                self.center = None  # 清除中心點

        if self.debug:
            self._draw_debug_info()  # 繪製調試資訊

    def stop_tracking(self):
        self.tracking = False  # 停止追蹤
        self.tracker = None  # 清除追蹤器
        self.center = None  # 清除中心點

    def _draw_debug_info(self):
        if self.tracking:
            cv2.putText(self.frame, f"Tracking: {self.tracking}", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)  # 顯示追蹤狀態
            cv2.putText(self.frame, f"Center: {self.center}", (10, 40), cv2.FONT_H


In [None]:
# zoom_handler.py
# --------------------------------------------------------------
# Author: Thomas Huang
# Date: 2024-05-03
# Description: Zoom
# --------------------------------------------------------------

import cv2

class ZoomHandler:
    def __init__(self):
        self.zoom_factor = 1.0

    def zoom_in(self):
        self.zoom_factor += 0.1

    def zoom_out(self):
        if self.zoom_factor - 0.1 >= 1.0:
            self.zoom_factor -= 0.1

    def zoom_reset(self):
        self.zoom_factor = 1.0

    def apply_zoom(self, frame):
        h, w, _ = frame.shape
        zoomed_frame = cv2.resize(frame, None, fx=self.zoom_factor, fy=self.zoom_factor, interpolation=cv2.INTER_LINEAR)
        zoomed_h, zoomed_w, _ = zoomed_frame.shape
        x_start = (zoomed_w - w) // 2
        y_start = (zoomed_h - h) // 2
        x_end = x_start + w
        y_end = y_start + h
        return zoomed_frame[y_start:y_end, x_start:x_end]


In [6]:
#video_capture.py
# --------------------------------------------------------------
# Author: Thomas Huang
# Date: 2024-06-26
# Description: video_capture
# --------------------------------------------------------------

import cv2
import threading
from tracker import ObjectTracker  # 引入物件追蹤器模組
from zoom_handler import ZoomHandler  # 引入縮放處理模組
from image_handler import ImageHandler  # 引入影像處理模組

class VideoCaptureThread(threading.Thread):
    def __init__(self, tracker, image_handler):
        super().__init__()
        self.tracker = tracker  # 追蹤器對象
        self.image_handler = image_handler  # 影像處理器對象
        self.running = True  # 控制循環的變量
        self.tracking_enabled = False  # 追蹤是否啟用
        self.zoom_handler = ZoomHandler()  # 縮放處理器對象
    
    def run(self):
        # 創建顯示視窗
        cv2.namedWindow("Webcam")
        while self.running:
            # 讀取影像
            ret, frame = self.image_handler.read_frame()
            if not ret:
                break
            
            # 應用縮放處理
            frame = self.zoom_handler.apply_zoom(frame)
            if self.tracking_enabled:
                # 更新追蹤器
                self.tracker.update_tracker(frame)
            
            # 顯示影像
            cv2.imshow("Webcam", frame)
            
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q') or key == 27:
                # 按下 'q' 或 'esc' 鍵退出
                self.running = False
            if key == ord('t') or key == ord('T'):
                # 按下 't' 鍵切換追蹤
                self.tracking_enabled = not self.tracking_enabled
                if self.tracking_enabled:
                    print("Tracking enabled")  # 啟動追蹤
                    cv2.setMouseCallback("Webcam", self.tracker.draw_rectangle)
                else:
                    print("Tracking disabled")  # 停止追蹤
                    self.tracker.stop_tracking()
                    cv2.setMouseCallback("Webcam", lambda *args: None)  # 移除滑鼠回調
            if key == ord('+'):
                # 按下 '+' 鍵放大
                self.zoom_handler.zoom_in()
            if key == ord('-'):
                # 按下 '-' 鍵縮小
                self.zoom_handler.zoom_out()
            if key == ord('z') or key == ord('Z'):
                # 按下 'z' 鍵重置縮放
                self.zoom_handler.zoom_reset()
            if key == ord('s') or key == ord('S'):
                # 按下 's' 鍵保存當前影像
                self.image_handler.save_frame(frame, "saved_frame.jpg")
                
        self.image_handler.release()
        cv2.destroyAllWindows()

def main():
    tracker = ObjectTracker(roi_size=30, tracker_type='CSRT', debug=True)  # 創建追蹤器對象
    image_handler = ImageHandler(camera_index=0)  # 創建影像處理器對象
    video_thread = VideoCaptureThread(tracker, image_handler)  # 創建主 threading
    video_thread.start()  # 啟動主 threading
    video_thread.join()  # 等待主 threading 結束

if __name__ == "__main__":
    main()


In [None]:
#image_handler.py
# --------------------------------------------------------------
# Author: Thomas Huang
# Date: 2024-06-26
# Description: image_handler
# --------------------------------------------------------------
import cv2

class ImageHandler:
    def __init__(self, camera_index=0):
        # 初始化相機
        self.cap = cv2.VideoCapture(camera_index)
        self._check_camera()
    
    def _check_camera(self):
        # 檢查相機是否打開
        if not self.cap.isOpened():
            print("Error: Could not open webcam.")  # 錯誤提示
            exit()
    
    def read_frame(self):
        # 讀取一幀影像
        ret, frame = self.cap.read()
        if not ret:
            print("Error: Could not read frame from webcam.")  # 錯誤提示
        return ret, frame
    
    def release(self):
        # 釋放相機資源
        self.cap.release()
    
    def save_frame(self, frame, filename):
        # 保存當前影像
        cv2.imwrite(filename, frame)


In [None]:
import numpy as np
import cv2
import threading
from copy import deepcopy

thread_lock = threading.Lock()
thread_exit = False

class CameraThread(threading.Thread):
    def __init__(self, camera_id, img_height, img_width):
        super(CameraThread, self).__init__()
        self.camera_id = camera_id  # 相機 ID
        self.img_height = img_height  # 影像高度
        self.img_width = img_width  # 影像寬度
        self.frame = np.zeros((img_height, img_width, 3), dtype=np.uint8)  # 初始化影像幀

    def get_frame(self):
        # 獲取影像幀的副本
        return deepcopy(self.frame)

    def run(self):
        global thread_exit
        cap = cv2.VideoCapture(self.camera_id)  # 開啟相機
        while not thread_exit:
            ret, frame = cap.read()  # 讀取影像幀
            if ret:
                frame = cv2.resize(frame, (self.img_width, self.img_height))  # 調整影像大小
                thread_lock.acquire()
                self.frame = frame  # 更新影像幀
                thread_lock.release()
            else:
                thread_exit = True
        cap.release()  # 釋放相機


In [1]:
import cv2
from camera_thread import CameraThread, thread_lock, thread_exit

def main():
    global thread_exit
    camera_id = 0  # 相機 ID
    img_height = 480  # 影像高度
    img_width = 640  # 影像寬度

    thread = CameraThread(camera_id, img_height, img_width)  # 創建相機執行緒
    thread.start()  # 啟動相機執行緒

    while not thread_exit:
        thread_lock.acquire()
        frame = thread.get_frame()  # 獲取影像幀
        thread_lock.release()

        cv2.imshow('Video', frame)  # 顯示影像幀
        if cv2.waitKey(1) & 0xFF == ord('q'):
            thread_exit = True  # 按下 'q' 鍵退出

    thread.join()  # 等待相機執行緒結束
    cv2.destroyAllWindows()  # 關閉所有視窗

if __name__ == "__main__":
    main()


ModuleNotFoundError: No module named 'camera_thread'