In [None]:
#Yolov8
import os
import cv2
import json
from ultralytics import YOLO

# Paths
video_folder = r"E:\study\RPP\project_3_v1\dataset\dataset"  # Input video folder
yolo_output_folder = r"E:\study\RPP\project_3_v1\full\yolo\img"
yolo_json_folder = r"E:\study\RPP\project_3_v1\full\yolo\json"
video_mapping_file = r"E:\study\RPP\project_3_v1\full\yolo\video_mapping.json"  # Save video info

# Ensure output directories exist
os.makedirs(yolo_output_folder, exist_ok=True)
os.makedirs(yolo_json_folder, exist_ok=True)

# Load YOLO model
model = YOLO('yolov8n.pt')

# Frame counter across all videos (global counter)
global_frame_count = 1  

# Dictionary to store video start & end frame mapping`
video_frame_mapping = {}

# Process all videos in the folder
video_files = sorted([f for f in os.listdir(video_folder) if f.endswith(('.mp4', '.avi', '.mov'))])

for video_file in video_files:
    video_path = os.path.join(video_folder, video_file)
    cap = cv2.VideoCapture(video_path)
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))  # Get FPS of video
    frame_interval = max(1, fps // 30)  # Process at max 30 FPS

    frame_count = 0
    start_frame = global_frame_count  # Track where this video's frames start

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break  # Stop if video ends

        if frame_count % frame_interval == 0:  # Process at 30 FPS
            filename = f"frame_{global_frame_count:05d}.jpg"  # Keeps numbering continuous
            output_path = os.path.join(yolo_output_folder, filename)
            json_path = os.path.join(yolo_json_folder, filename.replace(".jpg", ".json"))

            # Run YOLOv8
            results = model(frame)

            detections = []  # Store bounding boxes
            for result in results:
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])  # Bounding box
                    conf = box.conf[0].item()

                    if conf > 0.5:  # Confidence threshold
                        detections.append({"bbox": [x1, y1, x2, y2], "confidence": conf})
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

            # Save detections JSON with video info
            json_data = {
                "video_name": video_file,  # Store original video name
                "frame_number": global_frame_count,
                "detections": detections
            }

            with open(json_path, "w") as f:
                json.dump(json_data, f, indent=4)

            # Save frame
            cv2.imwrite(output_path, frame)
            print(f"Processed: {filename} from {video_file}")

            global_frame_count += 1  # Maintain continuous numbering

        frame_count += 1

    cap.release()

    # Store video start and end frame mapping
    video_frame_mapping[video_file] = {"start_frame": start_frame, "end_frame": global_frame_count - 1}

# Save video-to-frame mapping
with open(video_mapping_file, "w") as f:
    json.dump(video_frame_mapping, f, indent=4)

print("YOLO detection completed. Processed images saved in:", yolo_output_folder)
print("Video-to-frame mapping saved in:", video_mapping_file)



0: 384x640 8 persons, 3 benchs, 1 dining table, 117.1ms
Speed: 5.0ms preprocess, 117.1ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)
Processed: frame_00001.jpg from body_movement_01.mp4

0: 384x640 8 persons, 3 benchs, 1 dining table, 101.5ms
Speed: 5.3ms preprocess, 101.5ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)
Processed: frame_00002.jpg from body_movement_01.mp4

0: 384x640 9 persons, 4 benchs, 1 dining table, 95.3ms
Speed: 3.0ms preprocess, 95.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
Processed: frame_00003.jpg from body_movement_01.mp4

0: 384x640 7 persons, 3 benchs, 1 dining table, 89.3ms
Speed: 3.9ms preprocess, 89.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
Processed: frame_00004.jpg from body_movement_01.mp4

0: 384x640 8 persons, 4 benchs, 1 dining table, 118.3ms
Speed: 3.1ms preprocess, 118.3ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)
Processed: frame_

In [None]:
#Movenet Thunder
import os
import cv2
import json
import numpy as np
import tensorflow_hub as hub
import chardet
import tensorflow as tf

# Paths
YOLO_JSON_FOLDER = r"/content/drive/MyDrive/head_movement/yolo_json"
YOLO_IMAGES_FOLDER = r"/content/drive/MyDrive/head_movement/yolo_output"
MOVENET_OUTPUT_FOLDER = r"/content/drive/MyDrive/head_movement/movenet_headm_out"
MOVENET_JSON_FOLDER = r"/content/drive/MyDrive/head_movement/movenet_headm_json"

# Ensure output directories exist
os.makedirs(MOVENET_OUTPUT_FOLDER, exist_ok=True)
os.makedirs(MOVENET_JSON_FOLDER, exist_ok=True)

# Load MoveNet Thunder model
movenet = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
movenet_fn = movenet.signatures["serving_default"]

def run_movenet(image):
    img_resized = cv2.resize(image, (256, 256))  # Resize image
    img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB)  # Convert to RGB
    img_tensor = tf.convert_to_tensor(img_rgb, dtype=tf.int32)  # Convert to tf.int32
    img_tensor = tf.expand_dims(img_tensor, axis=0)  # Add batch dimension

    outputs = movenet_fn(input=img_tensor)
    keypoints = outputs["output_0"].numpy().reshape(17, 3)  # Extract keypoints
    return keypoints


# Process each YOLO JSON file
yolo_json_files = sorted(os.listdir(YOLO_JSON_FOLDER))

for json_file in yolo_json_files:
    json_path = os.path.join(YOLO_JSON_FOLDER, json_file)

    if not json_file.endswith(".json") or os.path.isdir(json_path):
        print(f"Skipping non-JSON or directory: {json_file}")
        continue

    # Detect encoding before reading JSON
    with open(json_path, "rb") as f:
        raw_data = f.read()
        detected_encoding = chardet.detect(raw_data)["encoding"]

    with open(json_path, "r", encoding=detected_encoding) as f:
        yolo_data = json.load(f)

    image_name = json_file.replace(".json", ".jpg")
    image_path = os.path.join(YOLO_IMAGES_FOLDER, image_name)

    if not os.path.exists(image_path):
        print(f"Skipping {image_name}, image not found.")
        continue

    image = cv2.imread(image_path)
    image_height, image_width = image.shape[:2]
    video_name = yolo_data.get("video_name", "unknown")

    output_json = {
        "video_name": video_name,
        "frame_number": yolo_data.get("frame_number", -1),
        "detections": []
    }

    # Process each detected person
    for idx, detection in enumerate(yolo_data["detections"]):
        x1, y1, x2, y2 = detection["bbox"]

        # Clamp bounding box values within image size
        x1 = max(0, min(x1, image_width - 1))
        y1 = max(0, min(y1, image_height - 1))
        x2 = max(0, min(x2, image_width - 1))
        y2 = max(0, min(y2, image_height - 1))

        if x1 >= x2 or y1 >= y2:
            print(f"Skipping invalid crop in {image_name} due to incorrect bbox: ({x1}, {y1}, {x2}, {y2})")
            continue

        person_crop = image[y1:y2, x1:x2]

        if person_crop.shape[0] == 0 or person_crop.shape[1] == 0:
            print(f"Skipping empty crop in {image_name} (after clamping bbox).")
            continue

        keypoints = run_movenet(person_crop)

        keypoints_dict = {
            "bbox": detection["bbox"],
            "confidence": detection["confidence"],
            "keypoints": {
                key: {"x": float(pt[1]), "y": float(pt[0]), "confidence": float(pt[2])}
                for key, pt in zip([
                    "nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder",
                    "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee",
                    "right_knee", "left_ankle", "right_ankle"
                ], keypoints)
            }
        }
        output_json["detections"].append(keypoints_dict)

        # Draw keypoints on image
        for pt in keypoints:
            x = int(pt[1] * (x2 - x1) + x1)
            y = int(pt[0] * (y2 - y1) + y1)
            conf = pt[2]

            # Ensure keypoints are within the image bounds
            x = max(0, min(x, image_width - 1))
            y = max(0, min(y, image_height - 1))

            if conf > 0.3:
                cv2.circle(image, (x, y), 4, (0, 0, 255), -1)

    # Save JSON output
    json_output_path = os.path.join(MOVENET_JSON_FOLDER, json_file)
    with open(json_output_path, "w") as f:
        json.dump(output_json, f, indent=4)

    # Save annotated image
    output_image_path = os.path.join(MOVENET_OUTPUT_FOLDER, image_name)
    cv2.imwrite(output_image_path, image)

    print(f"Processed: {image_name} (Saved JSON + Keypoint Image)")


In [None]:
#Sort
import os
import json
import cv2
import importlib.util
import numpy as np

# Load SORT Manually
spec = importlib.util.spec_from_file_location("sort", "E:/study/RPP/project1/sort/sort.py")
sort_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(sort_module)
Sort = sort_module.Sort

print("SORT loaded successfully!")

# Initialize SORT tracker
tracker = Sort()

# Paths
movenet_json_dir = r"E:\study\RPP\project1\movenet\paper_passing\paper_passing_json"
movenet_image_dir = r"E:\study\RPP\project1\movenet\paper_passing\paper_passing_key"
tracking_output_dir = r"E:\study\RPP\project1\tracker\paper_passing\id_json"
tracking_image_dir = r"E:\study\RPP\project1\tracker\paper_passing\id_img"

# Ensure output directories exist
os.makedirs(tracking_output_dir, exist_ok=True)
os.makedirs(tracking_image_dir, exist_ok=True)

# Function to calculate IoU (Intersection over Union)
def iou(box1, box2):
    x1, y1, x2, y2 = box1
    x1b, y1b, x2b, y2b = box2
    
    xi1 = max(x1, x1b)
    yi1 = max(y1, y1b)
    xi2 = min(x2, x2b)
    yi2 = min(y2, y2b)
    
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2b - x1b) * (y2b - y1b)
    
    union_area = box1_area + box2_area - inter_area
    return inter_area / union_area if union_area > 0 else 0

# Get sorted list of JSON files (based on frame number)
json_files = sorted(os.listdir(movenet_json_dir), key=lambda x: int(x.split('_')[-1].split('.')[0]))

# Process each frame
for json_file in json_files:
    json_path = os.path.join(movenet_json_dir, json_file)

    with open(json_path, "r") as f:
        detections = json.load(f)  # Load MoveNet keypoints

    # Extract video name and frame number
    video_name = detections.get("video_name", "unknown_video")
    frame_number = detections.get("frame_number", -1)

    # Extract bounding boxes and keypoints
    bbox_detections = []
    keypoint_data = []
    
    for person in detections.get("detections", []):  # Adjusted to match MoveNet format
        bbox = person["bbox"]
        confidence = person["confidence"]
        keypoints = person["keypoints"]

        bbox_detections.append([bbox[0], bbox[1], bbox[2], bbox[3], confidence])
        keypoint_data.append({"bbox": bbox, "confidence": confidence, "keypoints": keypoints})

    # Convert to NumPy array (if empty, pass an empty array)
    bbox_detections = np.array(bbox_detections) if bbox_detections else np.empty((0, 5))

    # Track objects using SORT
    trackers = tracker.update(bbox_detections)

    # Prepare tracking results
    tracking_results = []
    
    for track in trackers:
        track_id = int(track[4])  # SORT assigns an ID
        bbox = [int(track[0]), int(track[1]), int(track[2]), int(track[3])]

        # Find closest match using IoU
        best_match = None
        best_iou = 0
        for person in keypoint_data:
            iou_score = iou(bbox, person["bbox"])
            if iou_score > best_iou and iou_score > 0.5:  # IoU threshold
                best_match = person
                best_iou = iou_score

        tracking_results.append({
            "track_id": track_id,
            "bbox": bbox,
            "confidence": best_match["confidence"] if best_match else 0,
            "keypoints": best_match["keypoints"] if best_match else {}
        })

    # Save tracking JSON
    output_json_path = os.path.join(tracking_output_dir, json_file)
    with open(output_json_path, "w") as f:
        json.dump({
            "video_name": video_name,
            "frame_number": frame_number,
            "tracked_people": tracking_results
        }, f, indent=4)

    # Load image
    image_path = os.path.join(movenet_image_dir, json_file.replace(".json", ".jpg"))
    if os.path.exists(image_path):
        image = cv2.imread(image_path)

        # Draw bounding boxes & IDs
        for person in tracking_results:
            x1, y1, x2, y2 = person["bbox"]
            person_id = person["track_id"]
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(image, f"ID: {person_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        # Save annotated image
        output_image_path = os.path.join(tracking_image_dir, json_file.replace(".json", ".jpg"))
        cv2.imwrite(output_image_path, image)
    else:
        print(f" Warning: Image not found for {json_file}, skipping image processing.")

print("Tracking completed successfully.")


In [None]:
#Finding Distance
import json
import os
import math
import cv2

# Paths
sort_json_dir = r"E:\study\RPP\project_3\tracker\sort_json"  # Folder with JSON files
movenet_img_dir = r"E:\study\RPP\project_3\tracker\sort_img"  # Folder with images
updated_json_dir = r"E:\study\RPP\project_3\distance\img_json"  # Folder to save updated JSON files
visualized_output_dir = r"E:\study\RPP\project_3\distance\img_dist"  # Folder to save visualized images

# Ensure output directories exist
os.makedirs(updated_json_dir, exist_ok=True)
os.makedirs(visualized_output_dir, exist_ok=True)

# Function to calculate Euclidean distance
def euclidean_distance(p1, p2):
    return math.sqrt((p2["x"] - p1["x"])**2 + (p2["y"] - p1["y"])**2)

# Function to convert normalized keypoints relative to bounding box
def denormalize_bbox(x, y, bbox):
    x1, y1, x2, y2 = bbox  # Bounding box coordinates
    abs_x = int(x * (x2 - x1) + x1)  # Scale x to bounding box
    abs_y = int(y * (y2 - y1) + y1)  # Scale y to bounding box
    return abs_x, abs_y

# Function to draw a line and label it with distance
def draw_distance(img, p1, p2, distance, bbox, color=(0, 255, 0)):
    x1, y1 = denormalize_bbox(p1["x"], p1["y"], bbox)
    x2, y2 = denormalize_bbox(p2["x"], p2["y"], bbox)
    
    cv2.line(img, (x1, y1), (x2, y2), color, 2)
    mid_x, mid_y = (x1 + x2) // 2, (y1 + y2) // 2
    cv2.putText(img, f"{distance:.2f}", (mid_x, mid_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

# Process all JSON files
json_files = sorted([f for f in os.listdir(sort_json_dir) if f.endswith(".json")])

for json_file in json_files:
    json_path = os.path.join(sort_json_dir, json_file)
    image_name = json_file.replace(".json", ".jpg")
    image_path = os.path.join(movenet_img_dir, image_name)
    updated_json_path = os.path.join(updated_json_dir, json_file)
    output_image_path = os.path.join(visualized_output_dir, image_name)

    # Check if corresponding image exists
    if not os.path.exists(image_path):
        print(f"Skipping {json_file}, image not found.")
        continue

    # Load JSON data
    with open(json_path, "r") as f:
        data = json.load(f)

    # Load image
    image = cv2.imread(image_path)

    # Process each tracked person separately
    for person in data["tracked_people"]:
        keypoints = person["keypoints"]
        bbox = person["bbox"]  # Bounding box for the person

        # Compute distances
        distances = {
            "nose_to_left_shoulder": euclidean_distance(keypoints["nose"], keypoints["left_shoulder"]),
            "nose_to_right_shoulder": euclidean_distance(keypoints["nose"], keypoints["right_shoulder"]),
            "left_shoulder_to_left_elbow": euclidean_distance(keypoints["left_shoulder"], keypoints["left_elbow"]),
            "right_shoulder_to_right_elbow": euclidean_distance(keypoints["right_shoulder"], keypoints["right_elbow"]),
            "left_elbow_to_left_wrist": euclidean_distance(keypoints["left_elbow"], keypoints["left_wrist"]),
            "right_elbow_to_right_wrist": euclidean_distance(keypoints["right_elbow"], keypoints["right_wrist"]),
            "left_shoulder_to_left_hip": euclidean_distance(keypoints["left_shoulder"], keypoints["left_hip"]),
            "right_shoulder_to_right_hip": euclidean_distance(keypoints["right_shoulder"], keypoints["right_hip"]),
            "left_hip_to_left_knee": euclidean_distance(keypoints["left_hip"], keypoints["left_knee"]),
            "right_hip_to_right_knee": euclidean_distance(keypoints["right_hip"], keypoints["right_knee"]),
            "left_knee_to_left_ankle": euclidean_distance(keypoints["left_knee"], keypoints["left_ankle"]),
            "right_knee_to_right_ankle": euclidean_distance(keypoints["right_knee"], keypoints["right_ankle"])
        }

        # Add distances to the JSON data
        person["distances"] = distances

        # Define joint pairs for visualization
        joint_pairs = [
            ("nose", "left_shoulder"),
            ("nose", "right_shoulder"),
            ("left_shoulder", "left_elbow"),
            ("right_shoulder", "right_elbow"),
            ("left_elbow", "left_wrist"),
            ("right_elbow", "right_wrist"),
            ("left_shoulder", "left_hip"),
            ("right_shoulder", "right_hip"),
            ("left_hip", "left_knee"),
            ("right_hip", "right_knee"),
            ("left_knee", "left_ankle"),
            ("right_knee", "right_ankle")
        ]

        # Draw lines and distances per person
        for joint1, joint2 in joint_pairs:
            if joint1 in keypoints and joint2 in keypoints and f"{joint1}_to_{joint2}" in distances:
                draw_distance(image, keypoints[joint1], keypoints[joint2], distances[f"{joint1}_to_{joint2}"], bbox)

    # Save the updated JSON file
    with open(updated_json_path, "w") as f:
        json.dump(data, f, indent=4)

    # Save the visualized image
    cv2.imwrite(output_image_path, image)

    print(f"Processed {json_file} → Updated JSON & Image Saved")

print("All frames processed successfully! JSONs updated and images saved.")


In [None]:
#Finding Angle
import os
import numpy as np
import json
import cv2
import matplotlib.pyplot as plt

# Function to calculate the angle between two vectors
def calculate_angle(v1, v2):
    dot_product = np.dot(v1, v2)
    magnitude_v1 = np.linalg.norm(v1)
    magnitude_v2 = np.linalg.norm(v2)
    angle_radians = np.arccos(dot_product / (magnitude_v1 * magnitude_v2))
    angle_degrees = np.degrees(angle_radians)
    return angle_degrees

# Input and output directories
input_json_dir = r"E:\study\RPP\project_3\distance\angle_1-j"  # Directory containing input JSON files
input_image_dir = r"C:\Users\navee\Downloads\testing_input\body_movement\sort_img"  # Directory containing input images
output_json_dir = r"E:\study\RPP\project_3\distance\angle_json"  # Directory to save updated JSON files
output_viz_dir = r"E:\study\RPP\project_3\distance\angle_img" # Directory to save visualization images

# Create output directories if they don't exist
os.makedirs(output_json_dir, exist_ok=True)
os.makedirs(output_viz_dir, exist_ok=True)

# Process each JSON file in the input directory
for json_file in os.listdir(input_json_dir):
    if json_file.endswith(".json"):
        # Load the JSON file
        json_path = os.path.join(input_json_dir, json_file)
        with open(json_path, 'r') as f:
            data = json.load(f)

        # Load the corresponding image
        frame_name = json_file.replace(".json", ".jpg")  # Assuming image names match JSON names
        image_path = os.path.join(input_image_dir, frame_name)
        image = cv2.imread(image_path)

        # Check if the image was loaded successfully
        if image is None:
            print(f"Error: Unable to load image at {image_path}. Please check the file path and ensure the image exists.")
            continue

        # Convert to RGB for matplotlib
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Get image dimensions
        height, width, _ = image.shape

        # Create a single plot for all persons
        plt.figure(figsize=(10, 10))
        plt.imshow(image)

        # Process each tracked person
        for person in data['tracked_people']:
            keypoints = person['keypoints']
            bbox = person['bbox']  # Bounding box: [x1, y1, x2, y2]
            
            # Extract bounding box coordinates
            x1, y1, x2, y2 = bbox
            bbox_width = x2 - x1
            bbox_height = y2 - y1
            
            # Extract relevant key points (normalized within the bounding box)
            left_shoulder = (keypoints['left_shoulder']['x'], keypoints['left_shoulder']['y'])
            right_shoulder = (keypoints['right_shoulder']['x'], keypoints['right_shoulder']['y'])
            left_elbow = (keypoints['left_elbow']['x'], keypoints['left_elbow']['y'])
            right_elbow = (keypoints['right_elbow']['x'], keypoints['right_elbow']['y'])
            left_wrist = (keypoints['left_wrist']['x'], keypoints['left_wrist']['y'])
            right_wrist = (keypoints['right_wrist']['x'], keypoints['right_wrist']['y'])
            
            # Map normalized key points to image coordinates
            left_shoulder_img = (int(x1 + left_shoulder[0] * bbox_width), int(y1 + left_shoulder[1] * bbox_height))
            right_shoulder_img = (int(x1 + right_shoulder[0] * bbox_width), int(y1 + right_shoulder[1] * bbox_height))
            left_elbow_img = (int(x1 + left_elbow[0] * bbox_width), int(y1 + left_elbow[1] * bbox_height))
            right_elbow_img = (int(x1 + right_elbow[0] * bbox_width), int(y1 + right_elbow[1] * bbox_height))
            left_wrist_img = (int(x1 + left_wrist[0] * bbox_width), int(y1 + left_wrist[1] * bbox_height))
            right_wrist_img = (int(x1 + right_wrist[0] * bbox_width), int(y1 + right_wrist[1] * bbox_height))
            
            # Calculate the vectors (in image coordinates)
            v32 = np.array(right_elbow_img) - np.array(right_shoulder_img)  # Vector from right shoulder to right elbow
            v43 = np.array(right_wrist_img) - np.array(right_elbow_img)  # Vector from right elbow to right wrist
            v65 = np.array(left_elbow_img) - np.array(left_shoulder_img)  # Vector from left shoulder to left elbow
            v76 = np.array(left_wrist_img) - np.array(left_elbow_img)  # Vector from left elbow to left wrist
            
            # Reference vectors (in image coordinates)
            v90 = np.array([0, 100])  # 90° to the horizontal plane (vertical)
            v180 = np.array([-100, 0])  # 180° to the horizontal plane (left)
            v0 = np.array([100, 0])  # 0° (horizontal plane, right)
            
            # Calculate the angles
            right_shoulder_elbow_angle = calculate_angle(v32, v180)  # Angle at right shoulder
            right_elbow_wrist_angle = calculate_angle(v43, v90)  # Angle at right elbow
            left_shoulder_elbow_angle = calculate_angle(v65, v0)  # Angle at left shoulder
            left_elbow_wrist_angle = calculate_angle(v76, v90)  # Angle at left elbow
            
            # Store the angles in the JSON with descriptive names
            person['angles'] = {
                "right_shoulder_elbow_angle": right_shoulder_elbow_angle,
                "right_elbow_wrist_angle": right_elbow_wrist_angle,
                "left_shoulder_elbow_angle": left_shoulder_elbow_angle,
                "left_elbow_wrist_angle": left_elbow_wrist_angle
            }
            
            # Plot key points (without labels)
            plt.scatter(*left_shoulder_img, color='blue')
            plt.scatter(*right_shoulder_img, color='blue')
            plt.scatter(*left_elbow_img, color='green')
            plt.scatter(*right_elbow_img, color='green')
            plt.scatter(*left_wrist_img, color='red')
            plt.scatter(*right_wrist_img, color='red')
            
            # Draw vectors (without labels)
            plt.arrow(*right_shoulder_img, *v32, color='orange', width=2)
            plt.arrow(*right_elbow_img, *v43, color='purple', width=2)
            plt.arrow(*left_shoulder_img, *v65, color='cyan', width=2)
            plt.arrow(*left_elbow_img, *v76, color='magenta', width=2)
            
            # Draw reference vectors (without labels)
            plt.arrow(right_shoulder_img[0], right_shoulder_img[1], *v180, color='black', width=2)
            plt.arrow(right_elbow_img[0], right_elbow_img[1], *v90, color='black', width=2)
            plt.arrow(left_shoulder_img[0], left_shoulder_img[1], *v0, color='black', width=2)

        # Save the visualization image
        viz_output_path = os.path.join(output_viz_dir, frame_name)
        plt.savefig(viz_output_path)
        plt.close()

        # Save the updated JSON file
        json_output_path = os.path.join(output_json_dir, json_file)
        with open(json_output_path, 'w') as f:
            json.dump(data, f, indent=4)

        print(f"Processed {json_file} and saved results.")

In [None]:
#Prepare DNN input
import json
import csv
import os
import re  # Import the regex module for pattern matching

# Define the directory containing JSON files
json_dir = r"E:\study\RPP\project_3_v1\distance\angle_json" # Update this path to your JSON directory

# Define the output CSV file path
output_csv = r'E:\study\RPP\project_3_v1\distance\output.csv'  # Update this path to your desired CSV location

# Define the configuration for track_id and frame_number ranges (labeled by video_name)
config = {
    12: [range(34, 39)],
    14: [range(50, 52)],
    18: [range(77, 85)],
    4: [range(233, 332)],
    66: [range(415, 480)],
    191: [range(561, 581)],
    206: [range(585, 602)],
    219: [range(653, 720)],
    356: [range(806, 815)],
    365: [range(823, 827)],
    389: [range(854, 866)],
    431: [range(922, 932), range(935, 959), range(1014, 1051), range(1104, 1139)],
    507: [range(1231, 1244), range(1262, 1276)],
    558: [range(1284, 1288)],
    580: [range(1343, 1375)],
    650: [range(1431, 1465)]
}

# Define the configuration for ranges labeled as 'normal'
normal_config = {
    # Example: Add ranges for frames labeled as 'normal'
    4: [range(1, 30)],  # Example range for track_id 100
    3: [range(200, 220)],
    71: [range(387, 405)],
    432: [range(930, 970)],
    5:[range(1,10)],
    3:[range(1,40)],
    6:[range(187,210)],
    6:[range(361,367)],
    66:[range(377,387)]
}

# Define the CSV columns
columns = ['video_name', 'frame_number', 'track_id', 'bbox', 'keypoints', 'distances', 'angles', 'label']

# Debug: Indicate CSV creation
print("Creating CSV file...")

# Open the CSV file for writing
with open(output_csv, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=columns)
    writer.writeheader()

    # Debug: Count total JSON files
    total_files = len([f for f in os.listdir(json_dir) if f.endswith('.json')])
    print(f"Total JSON files to process: {total_files}")

    # Loop through all JSON files in the directory
    for json_file in os.listdir(json_dir):
        if json_file.endswith('.json'):  # Process only JSON files
            json_path = os.path.join(json_dir, json_file)

            # Debug: Indicate loading JSON file
            print(f"Loading JSON file: {json_file}")

            try:
                # Load the JSON data
                with open(json_path, 'r') as file:
                    data = json.load(file)

                # Debug: JSON file loaded
                print(f"JSON file {json_file} loaded successfully.")

                # Extract the video_name
                video_name = data['video_name']

                # Check if the video_name contains numbering (e.g., _01, _02, etc.) and optional (1), (2), etc.
                if re.match(r'.+_\d+( \(\d+\))?\.mp4$', video_name):
                    # Extract the label by removing the numbering, (1), (2), etc., and .mp4
                    label = re.sub(r'_\d+( \(\d+\))?\.mp4$', '', video_name)
                else:
                    # Skip this file if the video_name does not contain numbering
                    print(f"Skipping file {json_file} because video_name does not contain numbering.")
                    continue

                # Get the frame_number from the JSON file and convert it to an integer
                frame_number = int(data['frame_number'])  # Convert frame_number to integer
                print(f"Processing frame_number: {frame_number}")

                # Loop through each tracked person in the JSON file
                for person in data['tracked_people']:
                    track_id = person['track_id']
                    print(f"Checking frame_number {frame_number} for track_id {track_id}")

                    # Check if the track_id is in the normal_config (labeled as 'normal')
                    if track_id in normal_config:
                        # Loop through all ranges for this track_id in normal_config
                        for frame_range in normal_config[track_id]:
                            # Check if the frame_number is within the current range
                            if frame_number >= frame_range.start and frame_number < frame_range.stop:
                                # Prepare the row data with label 'normal'
                                row = {
                                    'video_name': video_name,
                                    'frame_number': frame_number,
                                    'track_id': track_id,
                                    'bbox': person['bbox'],
                                    'keypoints': {kp: {'x': person['keypoints'][kp]['x'], 'y': person['keypoints'][kp]['y']} for kp in person['keypoints']},
                                    'distances': person['distances'],
                                    'angles': person['angles'],
                                    'label': 'normal'  # Use 'normal' as the label
                                }

                                # Write the row to the CSV
                                writer.writerow(row)
                                print(f"Data written for frame_number {frame_number}, track_id {track_id}, range {frame_range} (labeled as 'normal')")

                    # Check if the track_id is in the config (labeled by video_name)
                    if track_id in config:
                        # Loop through all ranges for this track_id in config
                        for frame_range in config[track_id]:
                            # Check if the frame_number is within the current range
                            if frame_number >= frame_range.start and frame_number < frame_range.stop:
                                # Prepare the row data with label derived from video_name
                                row = {
                                    'video_name': video_name,
                                    'frame_number': frame_number,
                                    'track_id': track_id,
                                    'bbox': person['bbox'],
                                    'keypoints': {kp: {'x': person['keypoints'][kp]['x'], 'y': person['keypoints'][kp]['y']} for kp in person['keypoints']},
                                    'distances': person['distances'],
                                    'angles': person['angles'],
                                    'label': label  # Use the label derived from video_name
                                }

                                # Write the row to the CSV
                                writer.writerow(row)
                                print(f"Data written for frame_number {frame_number}, track_id {track_id}, range {frame_range} (labeled as '{label}')")

            except json.JSONDecodeError:
                print(f"Skipping corrupted file: {json_file}")
                continue

            # Debug: Indicate JSON file processed
            print(f"Processed JSON file: {json_file}")

    # Debug: CSV file saved
    print("CSV file saved successfully.")

In [None]:
#Train using 500 epoch
# -------------------------------
# Install and Import Libraries
# -------------------------------
import os
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# -------------------------------
# Check for GPU
# -------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# -------------------------------
# Load Data
# -------------------------------
file_path = '/content/drive/MyDrive/output.csv'
data = pd.read_csv(file_path)

# Convert distances and angles from strings to dictionaries
data['distances'] = data['distances'].apply(eval)
data['angles'] = data['angles'].apply(eval)

# -------------------------------
# Prepare Data
# -------------------------------
# Flatten distances and angles into a single feature vector
features = data['distances'].apply(lambda x: list(x.values())) + data['angles'].apply(lambda x: list(x.values()))
X = torch.tensor(features.tolist(), dtype=torch.float32).to(device)

# Encode labels into numerical values
label_encoder = LabelEncoder()
y = torch.tensor(label_encoder.fit_transform(data['label']), dtype=torch.long).to(device)

# Split data into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

#  Print dataset size details
print(f"\nTotal dataset size: {len(data)}")
print(f"Training set size: {len(X_train)}")
print(f"Validation set size: {len(X_test)}\n")

# -------------------------------
# Define DNN Model
# -------------------------------
class DNNClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(DNNClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)  # No softmax (CrossEntropyLoss applies softmax internally)
        return x

# -------------------------------
# Initialize Model
# -------------------------------
input_size = X_train.shape[1]
num_classes = len(label_encoder.classes_)
model = DNNClassifier(input_size, num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# -------------------------------
# Early Stopping Configuration
# -------------------------------
num_epochs = 500
patience = 10
best_loss = float('inf')
counter = 0

# Trackers for plotting graphs
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

# -------------------------------
# Training Loop with Early Stopping
# -------------------------------
for epoch in range(num_epochs):
    # Training
    model.train()
    outputs = model(X_train)
    loss = criterion(outputs, y_train)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Track training loss and accuracy
    train_losses.append(loss.item())
    train_accuracy = accuracy_score(y_train.cpu(), torch.argmax(outputs, dim=1).cpu())
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    with torch.no_grad():
        y_pred = model(X_test)
        val_loss = criterion(y_pred, y_test).item()
        val_losses.append(val_loss)
        val_accuracy = accuracy_score(y_test.cpu(), torch.argmax(y_pred, dim=1).cpu())
        val_accuracies.append(val_accuracy)
    
    # Logging
    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}, Validation Loss: {val_loss:.4f}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}')

    # Early Stopping Logic
    if val_loss < best_loss:
        best_loss = val_loss
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print(f"\nEarly stopping at epoch {epoch + 1} due to no improvement in validation loss.")
            break

# -------------------------------
# Save Model After Training
# -------------------------------
output_dir = '/content/drive/MyDrive/results'
os.makedirs(output_dir, exist_ok=True)
model_path = os.path.join(output_dir, 'dnn_model.pth')
torch.save(model.state_dict(), model_path)
print(f'\nModel saved to {model_path}')

# -------------------------------
# Evaluation
# -------------------------------
model.eval()
with torch.no_grad():
    y_pred = model(X_test)
    y_pred_classes = torch.argmax(y_pred, dim=1)

# Calculate accuracy and classification report
accuracy = accuracy_score(y_test.cpu(), y_pred_classes.cpu())
report = classification_report(y_test.cpu(), y_pred_classes.cpu(), target_names=label_encoder.classes_)

# -------------------------------
# Print Classification Report
# -------------------------------
print("\n=== Classification Report ===\n")
print(report)
print(f'Validation Accuracy: {accuracy * 100:.2f}%')

# -------------------------------
# Save Graphs and Results
# -------------------------------

# Save Confusion Matrix
conf_matrix = confusion_matrix(y_test.cpu(), y_pred_classes.cpu())
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, cmap="Blues", fmt='d', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.savefig(os.path.join(output_dir, 'confusion_matrix.png'))
print('Confusion matrix saved.')

# Save Training and Validation Accuracy Plot
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(train_accuracies) + 1), train_accuracies, label='Train Accuracy', marker='o')
plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy', marker='o')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()
plt.savefig(os.path.join(output_dir, 'model_accuracy.png'))
print('Model accuracy plot saved.')

# Save Training and Validation Loss Plot
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss', marker='o')
plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss', marker='o')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.savefig(os.path.join(output_dir, 'model_loss.png'))
print('Model loss plot saved.')

# -------------------------------
# Final Summary
# -------------------------------
print("\nTraining Complete!")
print(f"Validation Accuracy: {accuracy * 100:.2f}%")
print(f"Results saved to: {output_dir}")


In [None]:
#Train using epoch_100
# -------------------------------
# Install and Import Libraries
# -------------------------------
import os
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# -------------------------------
# Check for GPU
# -------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

# -------------------------------
# Load Data
# -------------------------------
file_path = '/content/drive/MyDrive/output.csv'
data = pd.read_csv(file_path)

# Convert distances and angles from strings to dictionaries
data['distances'] = data['distances'].apply(eval)
data['angles'] = data['angles'].apply(eval)

# -------------------------------
# Prepare Data
# -------------------------------
# Flatten distances and angles into a single feature vector
features = data['distances'].apply(lambda x: list(x.values())) + data['angles'].apply(lambda x: list(x.values()))
X = torch.tensor(features.tolist(), dtype=torch.float32).to(device)

# Encode labels into numerical values
label_encoder = LabelEncoder()
y = torch.tensor(label_encoder.fit_transform(data['label']), dtype=torch.long).to(device)

# Split data into training and testing sets (80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Print dataset size details
print(f"\nTotal dataset size: {len(data)}")
print(f"Training set size: {len(X_train)}")
print(f"Validation set size: {len(X_test)}\n")

# -------------------------------
# Define DNN Model
# -------------------------------
class DNNClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(DNNClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)  # No softmax (CrossEntropyLoss applies softmax internally)
        return x

# -------------------------------
# Initialize Model
# -------------------------------
input_size = X_train.shape[1]
num_classes = len(label_encoder.classes_)
model = DNNClassifier(input_size, num_classes).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# -------------------------------
# Early Stopping Configuration
# -------------------------------
num_epochs = 100
patience = 10
best_loss = float('inf')
counter = 0

# Trackers for plotting graphs
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

# -------------------------------
# Training Loop with Early Stopping
# -------------------------------
for epoch in range(num_epochs):
    # Training
    model.train()
    outputs = model(X_train)
    loss = criterion(outputs, y_train)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # Track training loss and accuracy
    train_losses.append(loss.item())
    train_accuracy = accuracy_score(y_train.cpu(), torch.argmax(outputs, dim=1).cpu())
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    with torch.no_grad():
        y_pred = model(X_test)
        val_loss = criterion(y_pred, y_test).item()
        val_losses.append(val_loss)
        val_accuracy = accuracy_score(y_test.cpu(), torch.argmax(y_pred, dim=1).cpu())
        val_accuracies.append(val_accuracy)
    
    # Logging
    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_accuracy:.4f}, Val Acc: {val_accuracy:.4f}')

    # Early Stopping Logic
    if val_loss < best_loss:
        best_loss = val_loss
        counter = 0
    else:
        counter += 1
        if counter >= patience:
            print(f"\nEarly stopping at epoch {epoch + 1} due to no improvement in validation loss.")
            break

# -------------------------------
# Save Model After Training
# -------------------------------
output_dir = '/content/drive/MyDrive/results'
os.makedirs(output_dir, exist_ok=True)
model_path = os.path.join(output_dir, 'dnn_model.pth')
torch.save(model.state_dict(), model_path)
print(f'\nModel saved to {model_path}')

# -------------------------------
# Evaluation
# -------------------------------
model.eval()
with torch.no_grad():
    y_pred = model(X_test)
    y_pred_classes = torch.argmax(y_pred, dim=1)

# Calculate accuracy and classification report
accuracy = accuracy_score(y_test.cpu(), y_pred_classes.cpu())
report = classification_report(y_test.cpu(), y_pred_classes.cpu(), target_names=label_encoder.classes_)

# -------------------------------
# Print Classification Report
# -------------------------------
print("\n=== Classification Report ===\n")
print(report)
print(f'Validation Accuracy: {accuracy * 100:.2f}%')

# -------------------------------
# Save Confusion Matrix
# -------------------------------
conf_matrix = confusion_matrix(y_test.cpu(), y_pred_classes.cpu())
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, cmap="Blues", fmt='d',
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.savefig(os.path.join(output_dir, 'confusion_matrix.png'))
print('Confusion matrix saved.')

# -------------------------------
# Save Training and Validation Accuracy Plot
# -------------------------------
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(train_accuracies) + 1), train_accuracies, label='Train Accuracy')
plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid()
plt.savefig(os.path.join(output_dir, 'model_accuracy.png'))
print('Model accuracy plot saved.')

# -------------------------------
# Save Training and Validation Loss Plot
# -------------------------------
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(train_losses) + 1), train_losses, label='Train Loss')
plt.plot(range(1, len(val_losses) + 1), val_losses, label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.savefig(os.path.join(output_dir, 'model_loss.png'))
print('Model loss plot saved.')

# -------------------------------
# Final Summary
# -------------------------------
print("\nTraining Complete!")
print(f"Validation Accuracy: {accuracy * 100:.2f}%")
print(f"Results saved to: {output_dir}")


In [None]:
#prepare the test input 
import json
import csv
import os

# Define the directory containing JSON files
json_dir = r"E:\study\RPP\project_3\distance\my_test\paper_passing\json"  # Update this path to your JSON directory

# Define the output CSV file path
train_csv = r'E:\study\RPP\project_3\distance\my_test\paper_passing\output.csv'  # Update this path to your desired CSV location

# Define the CSV columns (without the 'label' column)
columns = ['video_name', 'frame_number', 'track_id', 'bbox', 'keypoints', 'distances', 'angles']

# Debug: Indicate CSV creation
print("Creating CSV file...")

# Open the CSV file for writing
with open(train_csv, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=columns)
    writer.writeheader()

    # Debug: Count total JSON files
    total_files = len([f for f in os.listdir(json_dir) if f.endswith('.json')])
    print(f"Total JSON files to process: {total_files}")

    # Loop through all JSON files in the directory
    for json_file in os.listdir(json_dir):
        if json_file.endswith('.json'):  # Process only JSON files
            json_path = os.path.join(json_dir, json_file)

            # Debug: Indicate loading JSON file
            print(f"Loading JSON file: {json_file}")

            try:
                # Load the JSON data
                with open(json_path, 'r') as file:
                    data = json.load(file)

                # Debug: JSON file loaded
                print(f"JSON file {json_file} loaded successfully.")

                # Extract the video_name
                video_name = data['video_name']

                # Get the frame_number from the JSON file and convert it to an integer
                frame_number = int(data['frame_number'])  # Convert frame_number to integer
                print(f"Processing frame_number: {frame_number}")

                # Loop through each tracked person in the JSON file
                for person in data['tracked_people']:
                    track_id = person['track_id']
                    print(f"Processing track_id: {track_id}")

                    # Prepare the row data
                    row = {
                        'video_name': video_name,
                        'frame_number': frame_number,
                        'track_id': track_id,
                        'bbox': person['bbox'],
                        'keypoints': {kp: {'x': person['keypoints'][kp]['x'], 'y': person['keypoints'][kp]['y']} for kp in person['keypoints']},
                        'distances': person['distances'],
                        'angles': person['angles']
                    }

                    # Write the row to the CSV
                    writer.writerow(row)
                    print(f"Data written for frame_number {frame_number}, track_id {track_id}")

            except json.JSONDecodeError:
                print(f"Skipping corrupted file: {json_file}")
                continue

            # Debug: Indicate JSON file processed
            print(f"Processed JSON file: {json_file}")

    # Debug: CSV file saved
    print("CSV file saved successfully.")

In [None]:
#Testing Code
import os
import cv2
import torch
import pandas as pd

# -------------------------------
# Load Trained Model
# -------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_path = r"C:\Users\navee\Downloads\dnn_model.pth"
input_size = 16
num_classes = 3

class DNNClassifier(torch.nn.Module):
    def __init__(self, input_size, num_classes):
        super(DNNClassifier, self).__init__()
        self.fc1 = torch.nn.Linear(input_size, 128)
        self.relu = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(128, 64)
        self.fc3 = torch.nn.Linear(64, num_classes)
        self.softmax = torch.nn.Softmax(dim=1)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.softmax(self.fc3(x))
        return x

# Load model and set to evaluation mode
model = DNNClassifier(input_size, num_classes).to(device)
model.load_state_dict(torch.load(model_path, weights_only=True))
model.eval()

# -------------------------------
# Load Test Data
# -------------------------------
test_csv = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_500\output.csv"
image_dir = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_500\img"
results_dir = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_500\output"
os.makedirs(results_dir, exist_ok=True)

# Load test data
test_data = pd.read_csv(test_csv)

# Remove leading/trailing spaces in column names
test_data.columns = test_data.columns.str.strip()

#  Debug: Print column names to verify correctness
print(f"CSV Columns: {test_data.columns.tolist()}")

#  Updated column names based on CSV structure
required_columns = ['frame_number', 'track_id', 'distances', 'angles', 'bbox']
missing_columns = [col for col in required_columns if col not in test_data.columns]

if missing_columns:
    raise KeyError(f"Missing columns in test data: {missing_columns}")

#  Convert distances, angles, and bbox from strings to lists/dictionaries
def convert_to_list(value):
    try:
        value = eval(value)
        if isinstance(value, dict):
            value = list(value.values())  # Convert dict values to list
        return value
    except:
        return value

test_data['distances'] = test_data['distances'].apply(convert_to_list)
test_data['angles'] = test_data['angles'].apply(convert_to_list)
test_data['bbox'] = test_data['bbox'].apply(eval)

# -------------------------------
# Prediction and Visualization
# -------------------------------
results = []
label_map = {0: 'body_movement', 1: 'normal', 2: 'paper_passing'}

#  Group by frame number to handle multiple persons per frame
grouped_data = test_data.groupby('frame_number')

for frame_number, group in grouped_data:
    frame_filename = f"frame_{str(frame_number).zfill(5)}.jpg"  # Format: frame_00001.jpg
    image_path = os.path.join(image_dir, frame_filename)

    #  Load the frame if it exists
    if os.path.exists(image_path):
        image = cv2.imread(image_path)
        if image is None:
            print(f"Skipping frame {frame_number} (Cannot read image)")
            continue
        
        for _, row in group.iterrows():
            track_id = row['track_id']
            distances = row['distances']
            angles = row['angles']
            bbox = row['bbox']  # Format: [x1, y1, x2, y2]

            #  Step 1: Prepare the input feature vector
            if not isinstance(distances, list) or not isinstance(angles, list):
                print(f"Skipping track {track_id} in frame {frame_number} (Invalid feature format)")
                continue
            
            feature_vector = distances + angles
            
            if len(feature_vector) != input_size:
                print(f"Skipping track {track_id} in frame {frame_number} (Invalid feature size: {len(feature_vector)})")
                continue

            input_tensor = torch.tensor(feature_vector, dtype=torch.float32).to(device).unsqueeze(0)

            #  Step 2: Predict using the model
            with torch.no_grad():
                output = model(input_tensor)
                predicted_class = torch.argmax(output, dim=1).item()
                predicted_label = label_map[predicted_class]

            #  Step 3: Save prediction result
            results.append([frame_number, track_id, distances, angles, predicted_label])

            #  Step 4: Draw bounding box and label on image
            x1, y1, x2, y2 = bbox
            color = (255, 255, 255)  # White bounding box and text

            # Draw bounding box
            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)

            # Add label on top of bounding box
            cv2.putText(
                image,
                f"{predicted_label}",
                (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                color,
                2
            )
        
        # Save the updated frame (AFTER processing all persons)
        output_path = os.path.join(results_dir, frame_filename)
        cv2.imwrite(output_path, image)
        print(f" Processed frame: {frame_number}")

# -------------------------------
# Save Results to CSV
# -------------------------------
results_df = pd.DataFrame(results, columns=['Frame', 'Track ID', 'Distances', 'Angles', 'Predicted Label'])
results_csv_path = os.path.join(results_dir, 'predictions.csv')
results_df.to_csv(results_csv_path, index=False)

print(f"\n Predictions saved to: {results_csv_path}")
print(" Testing Complete!")


In [2]:
#Testing Code
import os
import cv2
import torch
import pandas as pd

# -------------------------------
# Load Trained Model
# -------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_path = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100\dnn_model .pth"
input_size = 16
num_classes = 3

class DNNClassifier(torch.nn.Module):
    def __init__(self, input_size, num_classes):
        super(DNNClassifier, self).__init__()
        self.fc1 = torch.nn.Linear(input_size, 128)
        self.relu = torch.nn.ReLU()
        self.fc2 = torch.nn.Linear(128, 64)
        self.fc3 = torch.nn.Linear(64, num_classes)
        self.softmax = torch.nn.Softmax(dim=1)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.softmax(self.fc3(x))
        return x

# Load model and set to evaluation mode
model = DNNClassifier(input_size, num_classes).to(device)
model.load_state_dict(torch.load(model_path, weights_only=True))
model.eval()

# -------------------------------
# Load Test Data
# -------------------------------
test_csv = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100\output.csv"
image_dir = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100\img"
results_dir = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100\output"
os.makedirs(results_dir, exist_ok=True)

# Load test data
test_data = pd.read_csv(test_csv)

# Remove leading/trailing spaces in column names
test_data.columns = test_data.columns.str.strip()

#  Debug: Print column names to verify correctness
print(f"CSV Columns: {test_data.columns.tolist()}")

#  Updated column names based on CSV structure
required_columns = ['frame_number', 'track_id', 'distances', 'angles', 'bbox']
missing_columns = [col for col in required_columns if col not in test_data.columns]

if missing_columns:
    raise KeyError(f"Missing columns in test data: {missing_columns}")

#  Convert distances, angles, and bbox from strings to lists/dictionaries
def convert_to_list(value):
    try:
        value = eval(value)
        if isinstance(value, dict):
            value = list(value.values())  # Convert dict values to list
        return value
    except:
        return value

test_data['distances'] = test_data['distances'].apply(convert_to_list)
test_data['angles'] = test_data['angles'].apply(convert_to_list)
test_data['bbox'] = test_data['bbox'].apply(eval)

# -------------------------------
# Prediction and Visualization
# -------------------------------
results = []
label_map = {0: 'body_movement', 1: 'normal', 2: 'paper_passing'}

#  Group by frame number to handle multiple persons per frame
grouped_data = test_data.groupby('frame_number')

for frame_number, group in grouped_data:
    frame_filename = f"frame_{str(frame_number).zfill(5)}.jpg"  # Format: frame_00001.jpg
    image_path = os.path.join(image_dir, frame_filename)

    #  Load the frame if it exists
    if os.path.exists(image_path):
        image = cv2.imread(image_path)
        if image is None:
            print(f"Skipping frame {frame_number} (Cannot read image)")
            continue
        
        for _, row in group.iterrows():
            track_id = row['track_id']
            distances = row['distances']
            angles = row['angles']
            bbox = row['bbox']  # Format: [x1, y1, x2, y2]

            #  Step 1: Prepare the input feature vector
            if not isinstance(distances, list) or not isinstance(angles, list):
                print(f"Skipping track {track_id} in frame {frame_number} (Invalid feature format)")
                continue
            
            feature_vector = distances + angles
            
            if len(feature_vector) != input_size:
                print(f"Skipping track {track_id} in frame {frame_number} (Invalid feature size: {len(feature_vector)})")
                continue

            input_tensor = torch.tensor(feature_vector, dtype=torch.float32).to(device).unsqueeze(0)

            #  Step 2: Predict using the model
            with torch.no_grad():
                output = model(input_tensor)
                predicted_class = torch.argmax(output, dim=1).item()
                predicted_label = label_map[predicted_class]

            #  Step 3: Save prediction result
            results.append([frame_number, track_id, distances, angles, predicted_label])

            #  Step 4: Draw bounding box and label on image
            x1, y1, x2, y2 = bbox
            color = (255, 255, 255)  # White bounding box and text

            # Draw bounding box
            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)

            # Add label on top of bounding box
            cv2.putText(
                image,
                f"{predicted_label}",
                (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                color,
                2
            )
        
        # Save the updated frame (AFTER processing all persons)
        output_path = os.path.join(results_dir, frame_filename)
        cv2.imwrite(output_path, image)
        print(f" Processed frame: {frame_number}")

# -------------------------------
# Save Results to CSV
# -------------------------------
results_df = pd.DataFrame(results, columns=['Frame', 'Track ID', 'Distances', 'Angles', 'Predicted Label'])
results_csv_path = os.path.join(results_dir, 'predictions.csv')
results_df.to_csv(results_csv_path, index=False)

print(f"\n Predictions saved to: {results_csv_path}")
print(" Testing Complete!")


CSV Columns: ['video_name', 'frame_number', 'track_id', 'bbox', 'keypoints', 'distances', 'angles']
 Processed frame: 1084
 Processed frame: 1085
 Processed frame: 1086
 Processed frame: 1087
 Processed frame: 1088
 Processed frame: 1089
 Processed frame: 1090
 Processed frame: 1091
 Processed frame: 1092
 Processed frame: 1093
 Processed frame: 1094
 Processed frame: 1095
 Processed frame: 1096
 Processed frame: 1097
 Processed frame: 1098
 Processed frame: 1099
 Processed frame: 1100
 Processed frame: 1101
 Processed frame: 1102
 Processed frame: 1103
 Processed frame: 1104
 Processed frame: 1105
 Processed frame: 1106
 Processed frame: 1107
 Processed frame: 1108
 Processed frame: 1109
 Processed frame: 1110
 Processed frame: 1111
 Processed frame: 1112
 Processed frame: 1113
 Processed frame: 1114
 Processed frame: 1115
 Processed frame: 1116
 Processed frame: 1117
 Processed frame: 1118
 Processed frame: 1119
 Processed frame: 1120
 Processed frame: 1121
 Processed frame: 1122
 Pr

In [3]:
import cv2
import os

# Directory containing the frames
frames_dir = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100\output"
# Output video path
output_video_path = r"E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100/video.mp4"

# Get all frame file names and sort them
frame_files = [f for f in os.listdir(frames_dir) if f.endswith('.jpg') or f.endswith('.png')]
frame_files.sort()  # Sort files to maintain order

# Read the first frame to get the dimensions
first_frame = cv2.imread(os.path.join(frames_dir, frame_files[0]))
height, width, layers = first_frame.shape

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for .mp4 format
video_writer = cv2.VideoWriter(output_video_path, fourcc, 5, (width, height))

# Write each frame to the video
for frame_file in frame_files:
    frame = cv2.imread(os.path.join(frames_dir, frame_file))
    video_writer.write(frame)

# Release the video writer
video_writer.release()
cv2.destroyAllWindows()

print(f"Video saved at {output_video_path}")


Video saved at E:\study\RPP\project_3_v1\my_test\paper_passing\epoch_100/video.mp4
