In [2]:
!pip install ultralytics deep-sort-realtime numpy pandas tensorflow opencv-python

Collecting ultralytics
  Downloading ultralytics-8.3.107-py3-none-any.whl.metadata (37 kB)
Collecting deep-sort-realtime
  Downloading deep_sort_realtime-1.3.2-py3-none-any.whl.metadata (12 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.met

In [1]:
import cv2
import torch
import pandas as pd
import os
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

# 參數設定
csv_file = "./tracking_analysis/video_data.csv"  # CSV 檔案名稱
video_column = "video_file_path"  # 影片路徑欄位名稱
output_column = "video"  # 輸出 TXT 檔案名稱欄位
confidence_threshold = 0.5  # 偵測信心度閥值
save_processed_video = True  # 是否輸出經過 YOLOv8 + DeepSORT 處理的影片
process_all_videos = True  # 是否處理 CSV 中的所有影片
video_range = (2, 3)  # 當 process_all_videos 為 False 時，設定要處理的範圍

# 設定輸出資料夾
output_video_folder = "./tracking_analysis/deepsort_output/video"
output_txt_folder = "./tracking_analysis/deepsort_output/txt"

# 建立輸出資料夾
os.makedirs(output_txt_folder, exist_ok=True)
os.makedirs(output_video_folder, exist_ok=True)

df = pd.read_csv(csv_file)
if not process_all_videos:
    df = df.iloc[video_range[0]:video_range[1]]

yolo_model_path = "./training/yolov8/best.pt"  # 你的自訂 YOLOv8 模型路徑
model = YOLO(yolo_model_path)
tracker = DeepSort(max_age=0, n_init=3)  # 調整參數提高追蹤穩定性

for _, row in df.iterrows():
    video_path = row[video_column]

    # 設定輸出檔案的完整路徑
    output_txt = os.path.join(output_txt_folder, f"{row[output_column]}.txt")
    output_video_path = os.path.join(output_video_folder, f"{row[output_column]}_processed.mp4")

    cap = cv2.VideoCapture(video_path)
    frame_width = int(cap.get(3))
    frame_height = int(cap.get(4))
    fps = cap.get(cv2.CAP_PROP_FPS)

    if save_processed_video:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    frame_count = 0
    with open(output_txt, "w") as f:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            f.write(f"Frame #: {frame_count}\n")
            f.write(f"FPS: {fps:.2f}\n")

            results = model(frame)  # YOLOv8 偵測
            detections = []

            for result in results:
                for box in result.boxes.data:
                    x1, y1, x2, y2, conf, cls = box.cpu().numpy()
                    if conf >= confidence_threshold:
                        w = x2 - x1
                        h = y2 - y1
                        x = x1  # 使用左上角座標 x1, y1
                        y = y1
                        detections.append(((x, y, w, h), conf, int(cls)))  # 改為 (x, y, w, h)

            tracks = tracker.update_tracks(detections, frame=frame)

            for track in tracks:
                if not track.is_confirmed():
                    continue
                track_id = track.track_id
                x1, y1, x2, y2 = track.to_tlbr()
                class_name = "fish"  # 這裡假設所有物件都是魚，若有不同類別需額外處理
                f.write(f"Tracker ID: {track_id}, Class: {class_name},  BBox Coords (xmin, ymin, xmax, ymax): ({int(x1)}, {int(y1)}, {int(x2)}, {int(y2)})\n")

                if save_processed_video:
                    cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                    cv2.putText(frame, f"ID: {track_id}", (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            if save_processed_video:
                out.write(frame)

    cap.release()
    if save_processed_video:
        out.release()
cv2.destroyAllWindows()


[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
0: 384x640 9 fishs, 10.3ms
Speed: 2.6ms preprocess, 10.3ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 fishs, 10.6ms
Speed: 3.2ms preprocess, 10.6ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 fishs, 10.5ms
Speed: 2.8ms preprocess, 10.5ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 fishs, 10.5ms
Speed: 2.8ms preprocess, 10.5ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 fishs, 10.4ms
Speed: 2.8ms preprocess, 10.4ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 10 fishs, 10.5ms
Speed: 2.9ms preprocess, 10.5ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 fishs, 10.8ms
Speed: 2.8ms preprocess, 10.8ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 9 fishs, 10.1ms
Speed: 2.9ms preprocess, 10.1ms inference, 1.4ms postprocess p

error: OpenCV(4.11.0) /io/opencv/modules/highgui/src/window.cpp:1295: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvDestroyAllWindows'


讀取影片檔回傳解析度

In [None]:
import cv2

def get_video_resolution(video_path):
    """ 讀取影片並回傳解析度 (寬, 高) """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("無法開啟影片檔案")
        return None

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    return width, height

if __name__ == "__main__":
    video_path = "/content/drive/MyDrive/Final_Project/dataset/position_research/output_video/vid_1_processed.mp4"  # 請更改為你的影片檔案路徑
    resolution = get_video_resolution(video_path)
    if resolution:
        print(f"影片解析度: {resolution[0]}x{resolution[1]}")


影片解析度: 2304x1296


In [1]:
!pip install python-docx

Collecting python-docx
  Downloading python_docx-1.1.2-py3-none-any.whl.metadata (2.0 kB)
Downloading python_docx-1.1.2-py3-none-any.whl (244 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/244.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m235.5/244.3 kB[0m [31m6.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m244.3/244.3 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-docx
Successfully installed python-docx-1.1.2


In [2]:
### Cell 1: Import Libraries and Set Parameters
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import ConvexHull
from docx import Document
from docx.shared import Inches
import os

# Set parameters
csv_file_path = "./tracking_analysis/video_data.csv"
output_txt_folder = "./tracking_analysis/result/research_output_txt/"
output_img_folder = "./tracking_analysis/result/research_output_chart_img/"
report_file = "./tracking_analysis/result/summary_report.docx"
img_width, img_height = 2304, 1296
real_width, real_height = 120, 60
right_region_x = img_width * 2 / 3


# Graph size parameters
graph_width = 20 # Adjustable width
graph_height = 6.5  # Adjustable height
font_size = 25  # Adjustable font size for labels
frame_cutoff = 100  # Adjustable frame cutoff value

# Ensure output folders exist
os.makedirs(output_txt_folder, exist_ok=True)
os.makedirs(output_img_folder, exist_ok=True)

In [3]:
### Cell 2: Define Functions
def parse_tracking_data(file_path):
    frame_data = {}
    current_frame = None
    with open(file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line.startswith("Frame #:"):
                current_frame = int(line.split(": ")[1])
                frame_data[current_frame] = {}
            elif line.startswith("Tracker ID:"):
                parts = re.findall(r'\d+', line)
                if len(parts) >= 5 and current_frame is not None:
                    obj_id, x_min, y_min, x_max, y_max = map(int, parts[:5])
                    frame_data[current_frame][obj_id] = (x_min, y_min, x_max, y_max)
    return frame_data

def convert_to_real_world(bbox):
    xmin, ymin, xmax, ymax = bbox
    x_center = (xmin + xmax) / 2 * (real_width / img_width)
    y_center = (ymin + ymax) / 2 * (real_height / img_height)
    return x_center, y_center

def compute_avg_movement(frame_data):
    prev_positions = {}
    avg_movements = []
    total_avg_movement = 0

    for frame in sorted(frame_data.keys()):
        total_distance = 0
        count = 0
        current_positions = {}

        for tracker_id, bbox in frame_data[frame].items():
            real_pos = convert_to_real_world(bbox)
            current_positions[tracker_id] = real_pos
            if tracker_id in prev_positions:
                distance = np.linalg.norm(np.array(real_pos) - np.array(prev_positions[tracker_id]))
                total_distance += distance
                total_avg_movement += distance
                count += 1

        prev_positions = current_positions
        avg_movements.append(total_distance / count if count > 0 else 0)

    return avg_movements, total_avg_movement

def compute_center_movement(frame_data):
    prev_center = None
    center_movements = []
    total_center_movement = 0

    for frame in sorted(frame_data.keys()):
        points = [convert_to_real_world(bbox) for bbox in frame_data[frame].values()]
        if not points:
            center_movements.append(0)
            continue
        center = np.mean(points, axis=0)
        if prev_center is not None:
            movement = np.linalg.norm(center - prev_center)
            center_movements.append(movement)
            total_center_movement += movement
        else:
            center_movements.append(0)
        prev_center = center

    return center_movements, total_center_movement

def compute_density(frame_data):
    densities = []
    for frame in sorted(frame_data.keys()):
        points = [convert_to_real_world(bbox) for bbox in frame_data[frame].values()]
        if len(points) < 3:
            densities.append(0)
            continue
        hull = ConvexHull(points)
        density = hull.area / len(points)
        densities.append(density)
    return densities

def compute_right_region_ratio(frame_data):
    ratios = []
    for frame in sorted(frame_data.keys()):
        total_objects = len(frame_data[frame])
        right_objects = sum(1 for bbox in frame_data[frame].values() if (bbox[0] + bbox[2]) / 2 > right_region_x)
        ratio = right_objects / total_objects if total_objects > 0 else 0
        ratios.append(ratio)
    return ratios

def generate_report_word(video_data):
    doc = Document()
    doc.add_heading("Fish Tracking Analysis Report", level=1)
    for video_name, plots in video_data.items():
        doc.add_heading(f"Video: {video_name}", level=2)
        for plot in plots:
            doc.add_picture(plot, width=Inches(5))
    doc.save(report_file)
    print("Word report generated!")

def plot_results(video_name, results, max_values):
    plt.figure(figsize=(graph_width, graph_height * 4))
    colors = ['r', 'b', 'g', 'm']
    labels = ["Fish Group Average Movement per Frame", "Fish Group Central Point Tracking per Frame", "Fish Group Dispersion per frame", "Right-Side Foraging Ratio per Frame"]
    keys = ["avg_movements", "center_movements", "densities", "right_ratios"]

    for i, (key, label, color) in enumerate(zip(keys, labels, colors)):
        plt.subplot(4, 1, i+1)
        plt.plot(results[key], color=color, linewidth=3)
        plt.ylim(0, max_values[key] * 1.1)
        plt.grid(True, linestyle="--")
        plt.title(label, fontsize=font_size, fontweight='bold', pad=20)
        plt.xticks(fontsize=20)
        plt.yticks(fontsize=20)

    plt.tight_layout()
    plot_path = os.path.join(output_img_folder, f"{video_name}_combined.png")
    plt.savefig(plot_path, bbox_inches='tight')
    plt.close()
    return plot_path

def save_results(video_name, avg_movements, total_avg_movement, center_movements, total_center_movement, densities, right_ratios, output_folder):
    output_file = os.path.join(output_folder, f"{video_name}_calculate.txt")
    with open(output_file, "w") as f:
        f.write(f"Total Center Movement: {total_center_movement:.6f} cm\n")
        f.write(f"Total Average Movement: {total_avg_movement:.6f} cm\n")
        f.write("Frame\tAverage Movement (cm)\tDensity (cm²)\tCenter Movement (cm)\tRight Region Ratio\n")
        for i, (movement, density, center_movement, ratio) in enumerate(zip(avg_movements, densities, center_movements, right_ratios), start=1):
            f.write(f"{i}\t{movement:.6f}\t{density:.6f}\t{center_movement:.6f}\t{ratio:.6f}\n")
    return output_file

def adjust_frame_data(frame_data, frame_offset):
    """ 移除前面 frame_offset 幀，並將剩餘的資料重新索引 """
    adjusted_data = {}
    frames = sorted(frame_data.keys())

    for new_idx, old_idx in enumerate(frames[frame_offset:]):
        adjusted_data[new_idx] = frame_data[old_idx]

    return adjusted_data


In [4]:
def process_csv(csv_file_path, frame_offset=10):
    df = pd.read_csv(csv_file_path)
    video_data = {}
    max_values = {"avg_movements": 0, "center_movements": 0, "right_ratios": 0, "densities": 0}
    all_results = {}

    for _, row in df.iterrows():
        deepSort_txt_file = row['deepSort_txt_file_path']
        video_name = row['video']
        frame_data = parse_tracking_data(deepSort_txt_file)

        # 調整幀數，切掉 frame_offset 幀，重新索引
        frame_data = adjust_frame_data(frame_data, frame_offset)

        avg_movements, total_avg_movement = compute_avg_movement(frame_data)
        center_movements, total_center_movement = compute_center_movement(frame_data)
        densities = compute_density(frame_data)
        right_ratios = compute_right_region_ratio(frame_data)

        max_values = {k: max(max_values[k], max(v, default=0)) for k, v in zip(max_values.keys(), [avg_movements, center_movements, right_ratios, densities])}
        all_results[video_name] = {"avg_movements": avg_movements, "center_movements": center_movements, "densities": densities, "right_ratios": right_ratios}
        save_results(video_name, avg_movements, total_avg_movement, center_movements, total_center_movement, densities, right_ratios, output_txt_folder)


    for video_name, results in all_results.items():
        plot_path = plot_results(video_name, results, max_values)
        video_data[video_name] = [plot_path]

    generate_report_word(video_data)

process_csv(csv_file_path, frame_offset=100)  # 預設切掉前 10 幀


Word report generated!
