## YOLOv8 model Training for Barbell Detection and Tracking

In [20]:
from ultralytics import YOLO

model = YOLO("yolov8s.pt")  

# Train the model
model.train(
    data="C:\\Users\\kayma\\Downloads\\Bar_Path_Tracking.v4i.yolov8\\data.yaml",
    epochs=20, 
    imgsz=416, 
    batch=4, 
    device="cpu"  
)


New https://pypi.org/project/ultralytics/8.3.182 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.91  Python-3.9.21 torch-2.6.0+cpu CPU (13th Gen Intel Core(TM) i5-13600KF)
[34m[1mengine\trainer: [0mtask=detect, mode=train, model=yolov8s.pt, data=C:\Users\kayma\Downloads\Bar_Path_Tracking.v4i.yolov8\data.yaml, epochs=20, time=None, patience=100, batch=4, imgsz=416, save=True, save_period=-1, cache=False, device=cpu, workers=8, project=None, name=train5, exist_ok=False, pretrained=True, optimizer=auto, verbose=True, seed=0, deterministic=True, single_cls=False, rect=False, cos_lr=False, close_mosaic=10, resume=False, amp=True, fraction=1.0, profile=False, freeze=None, multi_scale=False, overlap_mask=True, mask_ratio=4, dropout=0.0, val=True, split=val, save_json=False, save_hybrid=False, conf=None, iou=0.7, max_det=300, half=False, dnn=False, plots=True, source=None, vid_stride=1, stream_buffer=False, visualize=False, augment=False, agnostic_nms=False, classes=None

[34m[1mtrain: [0mScanning C:\Users\kayma\Downloads\Bar_Path_Tracking.v4i.yolov8\train\labels.cache... 1606 images, 6 backgrounds, 0 corrupt: 100%|██████████| 1606/1606 [00:00<?, ?it/s]
[34m[1mval: [0mScanning C:\Users\kayma\Downloads\Bar_Path_Tracking.v4i.yolov8\valid\labels.cache... 151 images, 0 backgrounds, 0 corrupt: 100%|██████████| 151/151 [00:00<?, ?it/s]

module 'matplotlib' has no attribute 'backends'
[34m[1moptimizer:[0m 'optimizer=auto' found, ignoring 'lr0=0.01' and 'momentum=0.937' and determining best 'optimizer', 'lr0' and 'momentum' automatically... 
[34m[1moptimizer:[0m AdamW(lr=0.002, momentum=0.9) with parameter groups 57 weight(decay=0.0), 64 weight(decay=0.0005), 63 bias(decay=0.0)





[34m[1mTensorBoard: [0mmodel graph visualization added 
Image sizes 416 train, 416 val
Using 0 dataloader workers
Logging results to [1mruns\detect\train5[0m
Starting training for 20 epochs...

      Epoch    GPU_mem   box_loss   cls_loss   dfl_loss  Instances       Size


       1/20         0G      2.245      9.476      1.342          8        416:   0%|          | 1/402 [00:01<09:16,  1.39s/it]


KeyboardInterrupt: 

## Using MoveNet for pose detection, and YOLOv8 for barbell tracking and path

In [30]:
import cv2
import numpy as np
import os
import tensorflow as tf
import time
import pandas as pd
from ultralytics import YOLO

# =============================
# 1) Load models
# =============================
# Barbell detector
yolo_model = YOLO("runs/detect/train3/weights/best.pt")

# MoveNet Thunder TFLite
interpreter = tf.lite.Interpreter(model_path="movenet_thunder.tflite")
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# =============================
# 2) Globals & thresholds
# =============================
barbell_path = []
bar_center_history = []
sway_values = []
log_data = []

rep_count = 0
squat_down = False
feedback_message = ""

squat_down_angle = 90   # descend threshold
squat_up_angle   = 160  # ascend threshold
max_sway = 50           # px sway window

# =============================
# 3) Helpers
# =============================
def calculate_angle(a, b, c):
    """Angle at b in degrees."""
    a, b, c = np.array(a), np.array(b), np.array(c)
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return 360 - angle if angle > 180 else int(angle)

# =============================
# 4) Pose estimation + squat logic
# =============================
def detect_pose(frame, frame_idx):
    global rep_count, squat_down, feedback_message

    h, w, _ = frame.shape
    inp = cv2.resize(frame, (256, 256))
    inp = cv2.cvtColor(inp, cv2.COLOR_BGR2RGB)
    inp = np.expand_dims(inp, axis=0).astype(np.uint8)

    interpreter.set_tensor(input_details[0]['index'], inp)
    interpreter.invoke()
    keypoints = interpreter.get_tensor(output_details[0]['index'])[0][0]

    # Skeleton
    skeleton = [
        (5, 6), (5, 7), (7, 9), (6, 8), (8, 10),
        (5, 11), (6, 12), (11, 12), (11, 13), (13, 15),
        (12, 14), (14, 16)
    ]

    points = []
    for y, x, conf in keypoints:
        if conf > 0.3:
            cx, cy = int(x * w), int(y * h)
            points.append((cx, cy))
            cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1)
        else:
            points.append(None)

    for a, b in skeleton:
        if points[a] and points[b]:
            cv2.line(frame, points[a], points[b], (0, 255, 255), 2)

    avg_knee_angle = None
    try:
        l_hip, l_knee, l_ank = points[11], points[13], points[15]
        r_hip, r_knee, r_ank = points[12], points[14], points[16]
        angles = []

        if l_hip and l_knee and l_ank:
            angles.append(calculate_angle(l_hip, l_knee, l_ank))
        if r_hip and r_knee and r_ank:
            angles.append(calculate_angle(r_hip, r_knee, r_ank))

        if angles:
            avg_knee_angle = int(np.mean(angles))

            # rep logic
            if avg_knee_angle < squat_down_angle and not squat_down:
                squat_down = True
            if avg_knee_angle > squat_up_angle and squat_down:
                rep_count += 1
                squat_down = False

            # feedback
            if avg_knee_angle > squat_up_angle:
                feedback_message = "Too Upright"
            elif avg_knee_angle < 60:
                feedback_message = "Too Deep!"
            else:
                feedback_message = "Good Depth"

            # overlay angle
            cv2.putText(frame, f"Knee Angle: {avg_knee_angle} degrees", (30, 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
    except:
        pass

    log_data.append({
        "frame": frame_idx,
        "angle": avg_knee_angle,
        "reps": rep_count,
        "feedback": feedback_message,
        "sway": None
    })

    return frame

# =============================
# 5) Barbell tracking
# =============================
def track_barbell(frame, results, frame_idx):
    global feedback_message

    for box in results.boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
        conf = float(box.conf[0])
        if conf > 0.4:
            cx = (x1 + x2) // 2
            cy = (y1 + y2) // 2
            barbell_path.append((cx, cy))
            bar_center_history.append(cx)

            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
            cv2.putText(frame, "Barbell", (x1, y1 - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    for i in range(1, len(barbell_path)):
        cv2.line(frame, barbell_path[i-1], barbell_path[i], (0, 165, 255), 2)

    if len(bar_center_history) > 10:
        sway_range = max(bar_center_history[-10:]) - min(bar_center_history[-10:])
        sway_values.append(sway_range)
        if sway_range > max_sway:
            feedback_message = "Bar Sway Detected!"

        if len(log_data) > 0 and log_data[-1]["frame"] == frame_idx:
            log_data[-1]["sway"] = sway_range

    return frame

# =============================
# 6) Video I/O
# =============================
video_folder = "Input_Video"
video_filename = "Blue_Plates_BB_Squat_Dark Setting.mp4"
video_path = os.path.join(video_folder, video_filename)

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"Could not open video: {video_path}")

output_folder = "Output_Video"
os.makedirs(output_folder, exist_ok=True)

results_folder = os.path.join(output_folder, "Results")
os.makedirs(results_folder, exist_ok=True)

output_filename = "Blue_Plates_BB_Squat_Dark Setting_output_movenet.mp4"
output_path = os.path.join(output_folder, output_filename)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps_in = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(3)); h = int(cap.get(4))
out = cv2.VideoWriter(output_path, fourcc, fps_in, (w, h))

# =============================
# 7) Main loop
# =============================
frame_idx = 0
start_time = time.time()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    pose_frame = detect_pose(frame.copy(), frame_idx)

    results = yolo_model(frame, imgsz=416, conf=0.35, verbose=False)[0]
    pose_frame = track_barbell(pose_frame, results, frame_idx)

    cv2.putText(pose_frame, f"Reps: {rep_count}", (30, 40),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    if feedback_message:
        cv2.putText(pose_frame, feedback_message, (30, 120),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 3)

    cv2.imshow("Squat Analyzer (MoveNet)", pose_frame)
    out.write(pose_frame)

    frame_idx += 1
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()

elapsed = time.time() - start_time
avg_fps = frame_idx / elapsed if elapsed > 0 else 0.0
print(f"Saved video: {output_path}")
print(f"Average FPS: {avg_fps:.2f}")

# =============================
# 8) Save logs
# =============================
df = pd.DataFrame(log_data)
df["fps"] = avg_fps
perframe_csv = os.path.join(results_folder, "Blue_Plates_BB_Squat_Dark Setting_results_movenet_perframe.csv")
df.to_csv(perframe_csv, index=False)
print("Per-frame CSV:", perframe_csv)

summary = {
    "total_frames": frame_idx,
    "total_reps": rep_count,
    "avg_fps": avg_fps,
    "mean_angle": float(df["angle"].dropna().mean()) if df["angle"].notna().any() else None,
    "std_angle": float(df["angle"].dropna().std()) if df["angle"].notna().any() else None,
    "mean_sway": float(df["sway"].dropna().mean()) if df["sway"].notna().any() else None,
    "max_sway": float(df["sway"].dropna().max()) if df["sway"].notna().any() else None
}
summary_csv = os.path.join(results_folder, "Blue_Plates_BB_Squat_Dark Setting_results_movenet_summary.csv")
pd.DataFrame([summary]).to_csv(summary_csv, index=False)
print("Summary CSV:", summary_csv)


    TF 2.20. Please use the LiteRT interpreter from the ai_edge_litert package.
    See the [migration guide](https://ai.google.dev/edge/litert/migration)
    for details.
    


Saved video: Output_Video\Blue_Plates_BB_Squat_Dark Setting_output_movenet.mp4
Average FPS: 13.26
Per-frame CSV: Output_Video\Results\Blue_Plates_BB_Squat_Dark Setting_results_movenet_perframe.csv
Summary CSV: Output_Video\Results\Blue_Plates_BB_Squat_Dark Setting_results_movenet_summary.csv


## Using YOLOPose for pose detection, and YOLOv8 for barbell tracking and path

In [None]:
import cv2
import numpy as np
import os
import time
import pandas as pd
from ultralytics import YOLO

# =============================
# 1) Load models
# =============================
# Your barbell detector
yolo_model = YOLO("runs/detect/train3/weights/best.pt")

# Pose model (pretrained COCO-17)
pose_model = YOLO("yolov8n-pose.pt") 

# =============================
# 2) Globals & thresholds
# =============================
barbell_path = []
bar_center_history = []
sway_values = []

rep_count = 0
squat_down = False
feedback_message = ""

# per-frame log
log_data = []

# thresholds
squat_down_angle = 90     # start counting when below this
squat_up_angle   = 160    # finish rep when above this
max_sway = 50             # px (horizontal) over last 10 frames

# =============================
# 3) Helpers
# =============================
def calculate_angle(a, b, c):
    """Angle at b (in degrees) formed by points a-b-c."""
    a = np.array(a); b = np.array(b); c = np.array(c)
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return 360 - angle if angle > 180 else int(angle)

# =============================
# 4) Pose estimation (YOLOv8-Pose)
# =============================
def detect_pose(frame, frame_idx):
    """
    Runs YOLOv8-Pose; draws full COCO skeleton; computes knee angle,
    updates rep counter + feedback; logs per-frame metrics.
    Returns (annotated_frame, avg_knee_angle or None)
    """
    global rep_count, squat_down, feedback_message

    res = pose_model(frame, imgsz=416, conf=0.25, verbose=False)
    if len(res) == 0:
        # log empty row to keep indexing consistent
        log_data.append({
            "frame": frame_idx, "angle": None, "reps": rep_count,
            "feedback": feedback_message, "sway": None
        })
        return frame, None

    r0 = res[0]
    if r0.keypoints is None or r0.keypoints.xy is None or len(r0.keypoints.xy) == 0:
        log_data.append({
            "frame": frame_idx, "angle": None, "reps": rep_count,
            "feedback": feedback_message, "sway": None
        })
        return frame, None

    # use first person (optionally: pick largest bbox)
    kxy = r0.keypoints.xy[0]
    try:
        kxy = kxy.cpu().numpy()
    except Exception:
        kxy = np.array(kxy)

    # try to get confidences if available
    try:
        kdata = r0.keypoints.data[0].cpu().numpy()  # (K,3) -> [x,y,conf]
        kconf = kdata[:, 2]
    except Exception:
        kconf = np.ones((kxy.shape[0],), dtype=float)

    # COCO-17 skeleton pairs (same as your MoveNet skeleton subset)
    skeleton = [
        (5, 6), (5, 7), (7, 9), (6, 8), (8, 10),
        (5, 11), (6, 12), (11, 12), (11, 13), (13, 15),
        (12, 14), (14, 16)
    ]
    vis_thr = 0.3

    # draw keypoints
    for i, (x, y) in enumerate(kxy):
        if i < len(kconf) and kconf[i] >= vis_thr:
            cv2.circle(frame, (int(x), int(y)), 5, (255,0,255), -1)

    # draw skeleton lines
    for a, b in skeleton:
        if a < len(kxy) and b < len(kxy) and kconf[a] >= vis_thr and kconf[b] >= vis_thr:
            ax, ay = int(kxy[a][0]), int(kxy[a][1])
            bx, by = int(kxy[b][0]), int(kxy[b][1])
            cv2.line(frame, (ax, ay), (bx, by), (255,0,0), 2)

    # indices for knees/ankles/hips in COCO-17
    idx = dict(LHIP=11, LKNE=13, LANK=15, RHIP=12, RKNE=14, RANK=16)

    def get_pt(i):
        if i < len(kxy) and i < len(kconf) and kconf[i] >= vis_thr:
            return (int(kxy[i][0]), int(kxy[i][1]))
        return None

    l_hip, l_knee, l_ank = get_pt(idx["LHIP"]), get_pt(idx["LKNE"]), get_pt(idx["LANK"])
    r_hip, r_knee, r_ank = get_pt(idx["RHIP"]), get_pt(idx["RKNE"]), get_pt(idx["RANK"])

    avg_knee_angle = None
    try:
        angles = []
        if l_hip and l_knee and l_ank:
            angles.append(calculate_angle(l_hip, l_knee, l_ank))
        if r_hip and r_knee and r_ank:
            angles.append(calculate_angle(r_hip, r_knee, r_ank))

        if angles:
            avg_knee_angle = int(np.mean(angles))

            # rep logic
            if avg_knee_angle < squat_down_angle and not squat_down:
                squat_down = True
            if avg_knee_angle > squat_up_angle and squat_down:
                rep_count += 1
                squat_down = False

            # feedback
            if avg_knee_angle > squat_up_angle:
                feedback_message = "Too Upright"
            elif avg_knee_angle < 60:
                feedback_message = "Too Deep!"
            else:
                feedback_message = "Good Depth"

            # overlay
            cv2.putText(frame, f"Knee Angle: {avg_knee_angle} degrees", (30, 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
    except Exception:
        pass

    # log this frame; sway filled later by track_barbell
    log_data.append({
        "frame": frame_idx,
        "angle": avg_knee_angle,
        "reps": rep_count,
        "feedback": feedback_message,
        "sway": None
    })

    return frame, avg_knee_angle

# =============================
# 5) Barbell tracking
# =============================

def track_barbell(frame, results, frame_idx):
    global feedback_message

    for box in results.boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
        conf = float(box.conf[0])
        if conf > 0.4:
            cx = (x1 + x2) // 2
            cy = (y1 + y2) // 2
            barbell_path.append((cx, cy))
            bar_center_history.append(cx)

            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
            cv2.putText(frame, "Barbell", (x1, y1 - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    for i in range(1, len(barbell_path)):
        cv2.line(frame, barbell_path[i - 1], barbell_path[i], (0, 165, 255), 2)

    if len(bar_center_history) > 10:
        sway_range = max(bar_center_history[-10:]) - min(bar_center_history[-10:])
        sway_values.append(sway_range)
        if sway_range > max_sway:
            feedback_message = "Bar Sway Detected!"

        if len(log_data) > 0:
            if log_data[-1].get("frame", None) == frame_idx:
                log_data[-1]["sway"] = sway_range

    return frame

# =============================
# 6) Video I/O
# =============================
video_folder = "Input_Video"
video_filename = "Blue_Plates_BB_Squat_Dark Setting.mp4"
video_path = os.path.join(video_folder, video_filename)

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"Could not open video: {video_path}")

# main output folder
output_folder = "Output_Video"
os.makedirs(output_folder, exist_ok=True)

# subfolder for CSV results
results_folder = os.path.join(output_folder, "Results")
os.makedirs(results_folder, exist_ok=True)

# video output file
output_filename = "Blue_Plates_BB_Squat_Dark Setting_output_pose_squat_system.mp4"
output_path = os.path.join(output_folder, output_filename)

# set up VideoWriter properly
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps_in = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps_in, (w, h))


# =============================
# 7) Main loop
# =============================
frame_idx = 0
start_time = time.time()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # pose + log
    pose_frame, angle = detect_pose(frame.copy(), frame_idx)

    # barbell detect + sway + draw
    results = yolo_model(frame, imgsz=416, conf=0.35, verbose=False)[0]
    pose_frame = track_barbell(pose_frame, results, frame_idx)

    cv2.putText(pose_frame, f"Reps: {rep_count}", (30, 40),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
    if feedback_message:
        cv2.putText(pose_frame, feedback_message, (30, 120),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 3)

    cv2.imshow("Squat Analyzer (YOLOv8-Pose)", pose_frame)
    out.write(pose_frame)

    frame_idx += 1
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()

elapsed = time.time() - start_time
avg_fps = frame_idx / elapsed if elapsed > 0 else 0.0
print(f"Saved video: {output_path}")
print(f"Average FPS: {avg_fps:.2f}")

# =============================
# 8) Save logs
# =============================
df = pd.DataFrame(log_data)
df["fps"] = avg_fps

# Per-frame CSV
perframe_csv = os.path.join(results_folder,
    "Blue_Plates_BB_Squat_Dark Setting_results_yolopose_perframe.csv")
df.to_csv(perframe_csv, index=False)

# Summary CSV
summary = {
    "total_frames": frame_idx,
    "total_reps": rep_count,
    "avg_fps": avg_fps,
    "mean_angle": float(df["angle"].dropna().mean()) if df["angle"].notna().any() else None,
    "std_angle": float(df["angle"].dropna().std()) if df["angle"].notna().any() else None,
    "mean_sway": float(df["sway"].dropna().mean()) if df["sway"].notna().any() else None,
    "max_sway": float(df["sway"].dropna().max()) if df["sway"].notna().any() else None
}
summary_csv = os.path.join(results_folder,
    "Blue_Plates_BB_Squat_Dark Setting_results_yolopose_summary.csv")
pd.DataFrame([summary]).to_csv(summary_csv, index=False)



Saved video: Output_Video\Blue_Plates_BB_Squat_Dark Setting_output_pose_squat_system.mp4
Average FPS: 12.18
