# DAY 1

## Import the Required Libraries

In [None]:
import os
import cv2
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import seaborn as sns
from scipy.signal import resample

import mediapipe as mp
from mediapipe.python.solutions.drawing_utils import draw_landmarks
from mediapipe.python.solutions.drawing_styles import get_default_pose_landmarks_style

import yt_dlp

from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
from dtaidistance import dtw

## 1.Keypoint Extrations from the video

In [None]:
def extract_keypoints_from_video(video_path, output_csv, max_frames=None, start_sec=None, end_sec=None, debug_dir=None):
    cap = cv2.VideoCapture(video_path)
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose()

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration = total_frames / fps

    print(f"\n[INFO] Processing video: {video_path}")
    print(f"[INFO] FPS: {fps:.2f}, Total Frames: {int(total_frames)}, Duration: {duration:.2f}s")

    os.makedirs(os.path.dirname(output_csv), exist_ok=True)
    if debug_dir:
        os.makedirs(debug_dir, exist_ok=True)

    frame_idx = 0
    saved_idx = 0
    first_frame_time, last_frame_time = None, None

    with open(output_csv, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        header = ['frame']
        for i in range(33):
            header += [f'x{i}', f'y{i}', f'z{i}']
        writer.writerow(header)

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            time_sec = frame_idx / fps
            frame_idx += 1

            if start_sec and time_sec < start_sec:
                continue
            if end_sec and time_sec > end_sec:
                break
            if max_frames and saved_idx >= max_frames:
                break

            if first_frame_time is None:
                first_frame_time = time_sec
            last_frame_time = time_sec

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            result = pose.process(frame_rgb)

            row = [saved_idx]
            if result.pose_landmarks:
                for lm in result.pose_landmarks.landmark:
                    row += [lm.x, lm.y, lm.z]
            else:
                row += [0.0] * (33 * 3)
            writer.writerow(row)

            if debug_dir and saved_idx % 30 == 0:
                debug_frame = frame.copy()
                if result.pose_landmarks:
                    draw_landmarks(
                        debug_frame,
                        result.pose_landmarks,
                        mp_pose.POSE_CONNECTIONS,
                        landmark_drawing_spec=get_default_pose_landmarks_style()
                    )
                debug_path = os.path.join(debug_dir, f"frame_{saved_idx}_pose.jpg")
                cv2.imwrite(debug_path, debug_frame)

            saved_idx += 1

    cap.release()
    print(f"[INFO] Keypoints saved to {output_csv}")
    print(f"[INFO] Frames processed: {saved_idx}")
    print(f"[INFO] Time range: {first_frame_time:.2f}s to {last_frame_time:.2f}s\n")



## 2. DTW Score Comparison 

In [None]:
def load_keypoints_from_csv(csv_file):
    keypoints = []
    with open(csv_file, 'r') as f:
        reader = csv.reader(f)
        next(reader)
        for row in reader:
            frame_data = list(map(float, row[1:]))
            keypoints.append(frame_data)
    return np.array(keypoints)


def compute_dtw_distance(seq1, seq2):
    distance, path = fastdtw(seq1, seq2, dist=euclidean)
    return distance, path

## 3. Preprocess Keypoints

In [None]:


def preprocess_keypoints(csv_path, selected_indices):
    df = pd.read_csv(csv_path)
    sequence = []

    for _, row in df.iterrows():
        keypoints = np.array(row[1:]).astype(float) 
        pose = []
        for i in selected_indices:
            x, y = keypoints[i*3], keypoints[i*3 + 1]  
            pose.extend([x, y])
        pose = np.array(pose)

        root_x, root_y = pose[0], pose[1]
        pose -= np.array([root_x, root_y] * (len(pose) // 2))
        norm = np.linalg.norm(pose)
        pose = pose / norm if norm != 0 else pose

        sequence.append(pose)
    
    return np.array(sequence)


def align_and_resample(seq1, seq2):
    min_len = min(len(seq1), len(seq2))
    seq1_resampled = resample(seq1, min_len)
    seq2_resampled = resample(seq2, min_len)
    return seq1_resampled, seq2_resampled

In [None]:
def download_youtube_video(url, output_path="video.mp4"):
    try:
        ydl_opts = {
            'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
            'outtmpl': output_path,
            'merge_output_format': 'mp4',
            'quiet': False,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        print(f"[INFO] Downloaded video to: {output_path}")
        return output_path
    except Exception as e:
        print(f"[ERROR] Failed to download {url}: {e}")
        return None

In [None]:

if __name__ == "__main__":
    SELECTED_INDICES = [11, 13, 15, 23, 25, 27]  
    OUTPUT_DIR = "outputs"
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    video1_url = "https://www.youtube.com/watch?v=V0UDCppaxEw"
    video2_url = "https://www.youtube.com/watch?v=zC_F23Rxu-A"

    video1_path = "video1.mp4"
    video2_path = "video2.mp4"
    output_csv1 = os.path.join(r"../data", "video1_keypoints.csv")
    output_csv2 = os.path.join(r"../data", "video2_keypoints.csv")

    if not os.path.exists(video1_path):
        download_youtube_video(video1_url, video1_path)
    if not os.path.exists(video2_path):
        download_youtube_video(video2_url, video2_path)

    extract_keypoints_from_video(video1_path, output_csv1, debug_dir="debug_frames/video1")
    extract_keypoints_from_video(video2_path, output_csv2, start_sec=21, end_sec=31, debug_dir="debug_frames/video2")

    print("[INFO] Preprocessing keypoints...")
    seq1 = preprocess_keypoints(output_csv1, SELECTED_INDICES)
    seq2 = preprocess_keypoints(output_csv2, SELECTED_INDICES)

    min_len = min(len(seq1), len(seq2))
    seq1 = resample(seq1, min_len)
    seq2 = resample(seq2, min_len)

    dtw_distance, dtw_path = fastdtw(seq1, seq2, dist=euclidean)
    print(f"DTW Distance Score: {round(dtw_distance, 2)}")


# DAY 2

## 1.Time-Series Variation Plot

In [None]:

def plot_time_series_variation(seq1, seq2, dtw_path):
    variations = []
    for i, j in dtw_path:
        diff = np.linalg.norm(seq1[i] - seq2[j])
        variations.append(diff)
    plt.figure(figsize=(10, 4))
    plt.plot(variations, color='orange', linewidth=2)
    plt.title("Time-Series Pose Variation Over Aligned Frames")
    plt.xlabel("Alignment Step")
    plt.ylabel("Euclidean Distance")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig("outputs/time_series_variation.png")
    plt.show()

## 2.DTW Cost Cost Matrix Heatmap with Alignment Path

In [None]:
def plot_alignment_heatmap_with_values(seq1, seq2, dtw_path, output_path="outputs/heatmap_with_values.png"):
    import matplotlib.pyplot as plt
    import seaborn as sns
    from scipy.spatial.distance import cdist

    cost_matrix = cdist(seq1, seq2, metric='euclidean')

    plt.figure(figsize=(9, 7))
    ax = sns.heatmap(cost_matrix, cmap="magma", cbar=True, xticklabels=30, yticklabels=30)
    ax.set_title("DTW Cost Matrix with Alignment Path")
    path_x = [j for i, j in dtw_path]
    path_y = [i for i, j in dtw_path]
    plt.plot(path_x, path_y, color='lime', linewidth=2, alpha=0.8)
    plt.xlabel("Video 2 Frame Index")
    plt.ylabel("Video 1 Frame Index")
    plt.tight_layout()
    plt.savefig(output_path)
    plt.show()


## 3. DTW Alignment path(Scatter plot/ Line Trace)

In [None]:
def plot_alignment_path_scatter(dtw_path, output_path="outputs/dtw_alignment_path_scatter.png"):

    v1_indices, v2_indices = zip(*dtw_path)

    plt.figure(figsize=(8, 5))
    plt.plot(v1_indices, v2_indices, color="red", linewidth=2)
    plt.title("DTW Alignment Path (Frame Matching)")
    plt.xlabel("Video 1 Frame Index")
    plt.ylabel("Video 2 Frame Index")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(output_path)
    plt.show()


## 4. Frame by Frame Match Trace

In [None]:
def plot_frame_match_trace(dtw_path, output_path="outputs/frame_match_trace.png"):
    v1, v2 = zip(*dtw_path)
    plt.figure(figsize=(10, 4))
    plt.scatter(range(len(v1)), v1, s=10, label="Video 1", color='blue', alpha=0.6)
    plt.scatter(range(len(v2)), v2, s=10, label="Video 2", color='orange', alpha=0.6)
    plt.title("Matched Frame Indices Along DTW Path")
    plt.xlabel("Alignment Step")
    plt.ylabel("Frame Index")
    plt.legend()
    plt.tight_layout()
    plt.savefig(output_path)
    plt.show()


## OpenAI Content Genration

In [None]:
import openai
import base64
import os

client = openai.OpenAI(api_key="sk-proj-.......")  

def encode_image_to_base64(image_path):
    with open(image_path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")

def generate_gpt_conclusion_from_dtw(dtw_score, output_dir="outputs"):
    images = {
        "variation_plot": os.path.join(output_dir, "time_series_variation.png"),
        "heatmap": os.path.join(output_dir, "heatmap_with_values.png"),
        "alignment_scatter": os.path.join(output_dir, "dtw_alignment_scatter.png"),
        "frame_trace": os.path.join(output_dir, "frame_match_trace.png"),
    }

    attachments = []
    for name, path in images.items():
        if os.path.exists(path):
            img_data = encode_image_to_base64(path)
            attachments.append({
                "type": "image_url",
                "image_url": {
                    "url": f"data:image/png;base64,{img_data}"
                }
            })

    system_prompt = (
        "You are an expert in time-series pose alignment using computer vision. "
        f"The user has computed DTW alignment between two yoga pose sequences with a DTW score of {dtw_score}. "
        "The user has shared the following plots:\n"
        "- Time-series variation plot\n"
        "- DTW cost matrix heatmap with alignment path\n"
        "- DTW alignment scatter plot\n"
        "- Frame match trace plot\n\n"
        "Analyze and summarize:\n"
        "- Whether the sequences are well-aligned\n"
        "- What the DTW path tells us\n"
        "- Any notable variations or mismatches"
    )

    response = client.chat.completions.create(
        model="gpt-4-turbo",  
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": attachments}
        ],
        max_tokens=500
    )

    print("\n GPT-4-Vision Analysis:\n")
    print(response.choices[0].message.content)


# Main Pipeline for Running Day 2 diliverables

In [None]:
def download_youtube_video(url, output_path="video.mp4"):
    try:
        ydl_opts = {
            'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4',
            'outtmpl': output_path,
            'merge_output_format': 'mp4',
            'quiet': False,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        print(f"[INFO] Downloaded video to: {output_path}")
        return output_path
    except Exception as e:
        print(f"[ERROR] Failed to download {url}: {e}")
        return None

In [None]:

if __name__ == "__main__":
    SELECTED_INDICES = [11, 13, 15, 23, 25, 27]  
    OUTPUT_DIR = "outputs"
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    video1_url = "https://www.youtube.com/watch?v=V0UDCppaxEw"
    video2_url = "https://www.youtube.com/watch?v=zC_F23Rxu-A"

    video1_path = "video1.mp4"
    video2_path = "video2.mp4"
    output_csv1 = os.path.join(r"../data", "video1_keypoints.csv")
    output_csv2 = os.path.join(r"../data", "video2_keypoints.csv")

    if not os.path.exists(video1_path):
        download_youtube_video(video1_url, video1_path)
    if not os.path.exists(video2_path):
        download_youtube_video(video2_url, video2_path)

    extract_keypoints_from_video(video1_path, output_csv1, debug_dir="debug_frames/video1")
    extract_keypoints_from_video(video2_path, output_csv2, start_sec=21, end_sec=31, debug_dir="debug_frames/video2")

    print("[INFO] Preprocessing keypoints...")
    seq1 = preprocess_keypoints(output_csv1, SELECTED_INDICES)
    seq2 = preprocess_keypoints(output_csv2, SELECTED_INDICES)

    min_len = min(len(seq1), len(seq2))
    seq1 = resample(seq1, min_len)
    seq2 = resample(seq2, min_len)

    dtw_distance, dtw_path = fastdtw(seq1, seq2, dist=euclidean)
    print(f"DTW Distance Score: {round(dtw_distance, 2)}")


    plot_time_series_variation(seq1, seq2, dtw_path)
    plot_alignment_heatmap_with_values(seq1, seq2, dtw_path)
    plot_alignment_path_scatter(dtw_path)
    plot_frame_match_trace(dtw_path)

    generate_gpt_conclusion_from_dtw(dtw_score=dtw_distance, output_dir="outputs")

