In [7]:
import csv
import matplotlib
matplotlib.use('Agg')  # Set non-interactive backend
from tqdm import tqdm
import cv2
import mediapipe as mp
import matplotlib.pyplot as plt
import numpy as np

def process_video_to_video(input_video_path, output_video_path, output_csv_path='landmarks.csv', fps=None):
    # Initialize MediaPipe Pose
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(static_image_mode=False,
                        model_complexity=2,
                        smooth_landmarks=True,
                        min_detection_confidence=0.5,
                        min_tracking_confidence=0.5)

    # Open input video
    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        print("Error opening video file")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    input_fps = cap.get(cv2.CAP_PROP_FPS)
    fps = fps if fps is not None else input_fps if input_fps else 30

    # Initialize VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Set up CSV file for saving landmarks
    csv_file = open(output_csv_path, 'w', newline='')
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(['frame_number', 'landmark_id', 'x', 'y', 'z', 'visibility'])

    # Set up Matplotlib figure
    fig = plt.figure(figsize=(5, 5))
    ax = fig.add_subplot(111, projection='3d')
    
    # Initialize progress tracking
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    pbar = tqdm(total=total_frames, desc="Processing video")
    frame_number = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process frame with MediaPipe
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(frame_rgb)

        # Clear previous plot
        ax.cla()
        ax.set_xlim3d(-1, 1)
        ax.set_ylim3d(-1, 1)
        ax.set_zlim3d(-1, 1)
        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')

        if results.pose_world_landmarks:
            # Plot landmarks
            plot_world_landmarks(plt, ax, results.pose_world_landmarks)
            
            # Save landmarks to CSV
            for idx, landmark in enumerate(results.pose_world_landmarks.landmark):
                csv_writer.writerow([
                    frame_number,
                    idx,
                    landmark.x,
                    landmark.y,
                    landmark.z,
                    landmark.visibility
                ])
        else:
            # Write empty entry for missing detection
            csv_writer.writerow([frame_number, -1, None, None, None, None])

        # Convert plot to image
        fig.canvas.draw()
        pose_img = np.array(fig.canvas.buffer_rgba())
        pose_img = cv2.cvtColor(pose_img, cv2.COLOR_RGBA2BGR)
        pose_img = cv2.resize(pose_img, (width, height))

        # Overlay visualization
        combined_frame = cv2.addWeighted(frame, 0.7, pose_img, 0.3, 0)
        video_writer.write(combined_frame)

        frame_number += 1
        pbar.update(1)

    # Cleanup resources
    cap.release()
    video_writer.release()
    csv_file.close()
    plt.close(fig)
    pbar.close()
    print(f"Processed video saved to: {output_video_path}")
    print(f"Landmarks data saved to: {output_csv_path}")



In [8]:
def plot_world_landmarks(
    plt,
    ax,
    landmarks,
    visibility_th=0.5,
):
    landmark_point = []

    for index, landmark in enumerate(landmarks.landmark):
        landmark_point.append(
            [landmark.visibility, (landmark.x, landmark.y, landmark.z)])

    face_index_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    right_arm_index_list = [11, 13, 15, 17, 19, 21]
    left_arm_index_list = [12, 14, 16, 18, 20, 22]
    right_body_side_index_list = [11, 23, 25, 27, 29, 31]
    left_body_side_index_list = [12, 24, 26, 28, 30, 32]
    shoulder_index_list = [11, 12]
    waist_index_list = [23, 24]

    # 顔
    face_x, face_y, face_z = [], [], []
    for index in face_index_list:
        point = landmark_point[index][1]
        face_x.append(point[0])
        face_y.append(point[2])
        face_z.append(point[1] * (-1))

    # 右腕
    right_arm_x, right_arm_y, right_arm_z = [], [], []
    for index in right_arm_index_list:
        point = landmark_point[index][1]
        right_arm_x.append(point[0])
        right_arm_y.append(point[2])
        right_arm_z.append(point[1] * (-1))

    # 左腕
    left_arm_x, left_arm_y, left_arm_z = [], [], []
    for index in left_arm_index_list:
        point = landmark_point[index][1]
        left_arm_x.append(point[0])
        left_arm_y.append(point[2])
        left_arm_z.append(point[1] * (-1))

    # 右半身
    right_body_side_x, right_body_side_y, right_body_side_z = [], [], []
    for index in right_body_side_index_list:
        point = landmark_point[index][1]
        right_body_side_x.append(point[0])
        right_body_side_y.append(point[2])
        right_body_side_z.append(point[1] * (-1))

    # 左半身
    left_body_side_x, left_body_side_y, left_body_side_z = [], [], []
    for index in left_body_side_index_list:
        point = landmark_point[index][1]
        left_body_side_x.append(point[0])
        left_body_side_y.append(point[2])
        left_body_side_z.append(point[1] * (-1))

    # 肩
    shoulder_x, shoulder_y, shoulder_z = [], [], []
    for index in shoulder_index_list:
        point = landmark_point[index][1]
        shoulder_x.append(point[0])
        shoulder_y.append(point[2])
        shoulder_z.append(point[1] * (-1))

    # 腰
    waist_x, waist_y, waist_z = [], [], []
    for index in waist_index_list:
        point = landmark_point[index][1]
        waist_x.append(point[0])
        waist_y.append(point[2])
        waist_z.append(point[1] * (-1))
            
    ax.cla()
    ax.set_xlim3d(-1, 1)
    ax.set_ylim3d(-1, 1)
    ax.set_zlim3d(-1, 1)

    ax.scatter(face_x, face_y, face_z)
    ax.plot(right_arm_x, right_arm_y, right_arm_z)
    ax.plot(left_arm_x, left_arm_y, left_arm_z)
    ax.plot(right_body_side_x, right_body_side_y, right_body_side_z)
    ax.plot(left_body_side_x, left_body_side_y, left_body_side_z)
    ax.plot(shoulder_x, shoulder_y, shoulder_z)
    ax.plot(waist_x, waist_y, waist_z)
    
    plt.pause(.001)

    return


In [9]:
process_video_to_video(r"E:\ml_shiz\hackenza_quidich\Quidich-HACKATHON-25\2_crop.mp4", "output_new_2.mp4", "cropped_landmarks_2.csv")


Downloading model to e:\ml_shiz\hackenza_quidich\quidich_env\lib\site-packages\mediapipe/modules/pose_landmark/pose_landmark_heavy.tflite


  plt.pause(.001)
Processing video: 100%|██████████| 506/506 [02:18<00:00,  3.65it/s]

Processed video saved to: output_new_2.mp4
Landmarks data saved to: cropped_landmarks_2.csv





In [10]:
process_video_to_video(r"E:\ml_shiz\hackenza_quidich\Quidich-HACKATHON-25\1_crop.mp4", "output_new_1.mp4", "cropped_landmarks_1.csv")

  plt.pause(.001)
Processing video: 100%|██████████| 506/506 [02:05<00:00,  4.04it/s]

Processed video saved to: output_new_1.mp4
Landmarks data saved to: cropped_landmarks_1.csv



