In [None]:
import pandas as pd
import numpy as np
import ast
import cv2
import matplotlib.pyplot as plt

from keypoint_cleaner import assign_fencer_labels, interpolate_frames
from data_visualization import show_stats, plot_poses, draw_random_action, draw_random_window

PATH_ACTIONS_FILTERED               = "../Data/tmp/actions_filtered.csv"
PATH_KEYPOINTS                      = "../Data/Unprocessed/keypoints.csv"
PATH_KEYPOINTS_FILTERED             = "../Data/tmp/keypoints_filtered.csv"

LOW_CONFIDENCE_THRESHOLD            = 0.7

In [None]:
EDGES = [
    (0, 1), (1, 3),
    (3, 5), (1, 2),
    (0, 2), (2, 4),
    (4, 6),
    (5, 7), (7, 9),     # left arm
    (6, 8), (8, 10),    # right arm
    (5, 6),             # shoulders
    (11, 12),           # hips
    (5, 11), (6, 12),   # torso
    (11, 13), (13, 15), # left leg
    (12, 14), (14, 16)  # right leg
]

In [None]:
def plot_skeleton(frame, keypoints, color=(0,1,0), alpha=0.8):
    """Draw a skeleton on the frame using matplotlib coordinates."""
    for (i,j) in EDGES:
        if i >= len(keypoints) or j >= len(keypoints):
            continue
        x1, y1 = keypoints[i]
        x2, y2 = keypoints[j]
        plt.plot([x1, x2], [y1, y2], color=color, alpha=alpha, linewidth=2)
    plt.scatter(keypoints[:,0], keypoints[:,1], color=color, s=20)

def visualize_actions(video_path, frame_data, frame_ranges_for_file):
    """Visualize only frames where a fencer has an action."""
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Flatten all action frames for lookup
    all_action_frames = set()
    for fencer, ranges in frame_ranges_for_file.items():
        for start, end in ranges:
            all_action_frames.update(range(start, end+1))

    for fid in range(frame_count):
        ret, frame = cap.read()
        if not ret:
            break

        if fid not in all_action_frames:
            continue  # skip frames outside action ranges

        frame_info = frame_data.get(fid, {})
        poses = frame_info.get("poses", [])
        roi = frame_info.get("roi", None)

        # Convert BGR -> RGB for matplotlib
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        plt.figure(figsize=(12,6))
        plt.imshow(frame_rgb)
        plt.axis("off")

        # Draw ROI
        if roi:
            x1, y1, x2, y2 = roi
            plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1,
                                              fill=False, edgecolor="red", linewidth=2))

        # Draw fencer poses
        if isinstance(poses, list):
            colors = [(0,1,0), (1,0,0)]  # left=green, right=red
            for k, p in enumerate(poses):
                if isinstance(p, float) or p is None or np.isnan(p).all():
                    continue
                plot_skeleton(frame_rgb, p, color=colors[k])

        plt.title(f"Frame {fid}")
        plt.show()

def visualize_missing_frames(video_path, frame_data, frame_ranges_for_file):
    """Show frames within action ranges where left or right fencer was missing."""
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Flatten all action frames
    all_action_frames = set()
    for fencer, ranges in frame_ranges_for_file.items():
        for start, end in ranges:
            all_action_frames.update(range(start, end+1))

    for fid in range(frame_count):
        if fid not in all_action_frames:
            continue

        frame_info = frame_data.get(fid, {})
        left = frame_info.get("left_fencer", np.nan)
        right = frame_info.get("right_fencer", np.nan)

        if (isinstance(left, float) and np.isnan(left)) or \
           (isinstance(right, float) and np.isnan(right)):
            ret, frame = cap.read()
            if not ret:
                break
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            plt.figure(figsize=(12,6))
            plt.imshow(frame_rgb)
            plt.axis("off")
            plt.title(f"Missing fencer(s) in frame {fid}")
            plt.show()


In [None]:
df_keypoints = pd.read_csv(PATH_KEYPOINTS)

def merge_cols(df, old_col_prefix, new_col_name):
    keypoint_cols = [old_col_prefix + str(i) for i in range(0, 17)]
    
    df[new_col_name] = df[keypoint_cols].values.tolist()
    df[new_col_name] = df[new_col_name].apply(
        lambda kp_list: [ast.literal_eval(kp) if isinstance(kp, str) else kp for kp in kp_list]
    )

    return df.drop(columns=keypoint_cols)

df_keypoints = merge_cols(df_keypoints, "keypoint_", "keypoints")
df_keypoints = merge_cols(df_keypoints, "conf_", "confidences")

In [None]:
df_keypoints_fencers = assign_fencer_labels(df_keypoints)

df_keypoints_fencers = df_keypoints_fencers[["file", "fencer", "frame", "keypoints", "confidences"]]
df_keypoints_fencers.sort_values(by=["file", "fencer", "frame"], inplace=True)
df_keypoints_fencers.reset_index(drop=True, inplace=True)

print(df_keypoints_fencers)

In [None]:
df_filtered = pd.read_csv(PATH_ACTIONS_FILTERED)

df_merged = df_keypoints_fencers.merge(df_filtered, on=["file", "fencer"], how="left")
df_merged = df_merged[
    (df_merged["frame"] >= df_merged["start_frame"]) &
    (df_merged["frame"] <= df_merged["end_frame"])
]

df_merged = df_merged[["file", "fencer", "action_id","action", "frame", "start_frame", "end_frame", "keypoints", "confidences"]].reset_index(drop=True)
print(df_merged)

In [None]:
draw_random_action(df_merged, 3, seed=42)

In [None]:
confidences_no_head = df_merged["confidences"].apply(lambda x: x[4:])
confidences = confidences_no_head.apply(lambda x: np.mean([c for c in x if c is not None]))
df_low_confidence = df_merged[confidences < LOW_CONFIDENCE_THRESHOLD]

print(confidences.describe())
print("")
print("Number of low confidence frames: ", len(df_low_confidence))

sample = df_low_confidence.sample(min(10, len(df_low_confidence)), random_state=42)
sample_confidences = confidences.loc[sample.index]

print("")
print(sample_confidences)

plot_poses(df_low_confidence, 0, len(sample)-1, 10)

In [None]:
df_reduced = df_merged[confidences >= LOW_CONFIDENCE_THRESHOLD]
df_min_confidence = df_reduced[confidences < LOW_CONFIDENCE_THRESHOLD + 0.01]

print("Number of low confidence frames: ", len(df_min_confidence))
print("")

sample = df_min_confidence.sample(min(10, len(df_min_confidence)), random_state=42)
sample_confidences = confidences.loc[sample.index]

print(sample_confidences)
plot_poses(sample, 0, len(sample)-1, 10)

In [None]:
df_interpolated, stats = interpolate_frames(df_reduced)
df_interpolated = df_interpolated.drop(columns=["confidences"])

expected_frame_difference = stats["interpolated_frames"] + stats["copied_frames"] - stats["dropped"]
actual_frame_difference = len(df_interpolated) - len(df_reduced)

print("==== Interpolation Statistics ====")
print(stats.to_string())
print("")

print("Frame count before interpolation: ", len(df_reduced))
print("Frame count after interpolation:  ", len(df_interpolated))
print("")

print("Expected frame count difference: ", expected_frame_difference)
print("Actual frame count difference:   ", actual_frame_difference)

def keypoints_to_tuples(kp):
    arr = np.array(kp, dtype=float)
    return [(float(x), float(y)) for x, y in arr]

df_interpolated["keypoints"] = df_interpolated["keypoints"].apply(keypoints_to_tuples)

df_interpolated = df_interpolated[["file", "fencer", "action_id","action", "frame", "start_frame", "end_frame", "keypoints"]].reset_index(drop=True)
df_interpolated.to_csv(PATH_KEYPOINTS_FILTERED, index=False)