#Yolov3 import

In [None]:
import os

# Create the directory if it doesn't exist
if not os.path.exists("YOLOV3_Files"):
    os.makedirs("YOLOV3_Files")

# Download YOLOv3 files into the YOLOV3_Files folder
if not os.path.exists("YOLOV3_Files/yolov3.weights"):
    os.system("wget -P YOLOV3_Files https://pjreddie.com/media/files/yolov3.weights")
if not os.path.exists("YOLOV3_Files/yolov3.cfg"):
    os.system("wget -P YOLOV3_Files https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg")
if not os.path.exists("YOLOV3_Files/coco.names"):
    os.system("wget -P YOLOV3_Files https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names")

print("Files have been downloaded to YOLOV3_Files folder.")


In [None]:
cfg_path = "/content/YOLOV3_Files/yolov3.cfg"
weights_path = "/content/YOLOV3_Files/yolov3.weights"
names_path = "/content/YOLOV3_Files/coco.names"

In [None]:
import cv2
import numpy as np
# Load YOLOv3 model (same as before)

def load_yolov3_model(cfg_path, weights_path, names_path):
    net = cv2.dnn.readNet(weights_path, cfg_path)
    # Use GPU if available
    net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)  # Enable CUDA backend
    net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
    #net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
    #net.setPreferableTarget(cv2.dnn.DNN_TARGET_OPENCL)

    # Load class names
    with open(names_path, "r") as f:
        classes = f.read().strip().split("\n")

    return net, classes

# Detect pedestrians in a single frame
def detect_pedestrians_in_frame(frame, model, classes, confidence_threshold=0.2, nms_threshold=0.2):

    # Get frame dimensions
    height, width = frame.shape[:2]

    # Prepare the frame for YOLO (creating a blob)
    blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False)
    model.setInput(blob)

    # Get output layer names and perform a forward pass
    layer_names = model.getUnconnectedOutLayersNames()
    detections = model.forward(layer_names)

    boxes, confidences, class_ids = [], [], []
    pedestrian_class_id = classes.index("person")

    for output in detections:
        for detection in output:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]

            if class_id == pedestrian_class_id and confidence > confidence_threshold:
                # Scale bounding box to image size
                box = detection[0:4] * np.array([width, height, width, height])
                (center_x, center_y, box_width, box_height) = box.astype("int")
                x = int(center_x - (box_width / 2))
                y = int(center_y - (box_height / 2))
                boxes.append([x, y, int(box_width), int(box_height)])
                confidences.append(float(confidence))
                class_ids.append(class_id)
              #  print("Box:", box)
              #  print("Confidence:", confidence)
              #  print("Class ID:", class_id)

    # Apply non-maxima suppression to remove overlapping boxes
    indices = cv2.dnn.NMSBoxes(boxes, confidences, confidence_threshold, nms_threshold)
    # print("Indices before flattening:", indices)

    # Filter boxes based on indices
    if len(indices) > 0:
        indices = indices.flatten()
        filtered_boxes = [boxes[i] for i in indices]
    else:
        filtered_boxes = []

    #print("Filtered Boxes:", filtered_boxes)
    #print("Indices after flattening:", indices)
    print("Frame Finished")
    return filtered_boxes, indices





# Object Tracking

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Intersection over Union (IoU) calculation
def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[0] + boxA[2], boxB[0] + boxB[2])
    yB = min(boxA[1] + boxA[3], boxB[1] + boxB[3])

    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = boxA[2] * boxA[3]
    boxBArea = boxB[2] * boxB[3]
    unionArea = boxAArea + boxBArea - interArea
    iou =  interArea / unionArea if unionArea != 0 else 0

    return iou

# Track objects across frames using IoU matching
def track_objects(prev_boxes, curr_boxes, iou_threshold=0.1):
    matches = []
    unmatched_prev = list(range(len(prev_boxes)))
    unmatched_curr = list(range(len(curr_boxes)))

    for i, prev_box in enumerate(prev_boxes):
        best_match = None
        max_iou = 0

        for j, curr_box in enumerate(curr_boxes):
            if j in unmatched_curr:  # Ensure that curr_box is not matched yet
                iou_value = iou(prev_box, curr_box)
                if iou_value > max_iou and iou_value > iou_threshold:
                    best_match = j
                    max_iou = iou_value

        if best_match is not None:
            matches.append((i, best_match))
            unmatched_prev.remove(i)
            unmatched_curr.remove(best_match)
    return matches, unmatched_prev, unmatched_curr


# Install OpenPose

In [None]:
%cd /content/

import os
from os.path import exists, join, basename, splitext

git_repo_url = 'https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch.git'
git_path = '/content/lightweight-human-pose-estimation.pytorch'
pre_model_path = os.path.join(git_path,'pre_model')

project_name = splitext(basename(git_repo_url))[0]
if not exists(project_name):
  !git clone -q --depth 1 $git_repo_url
  !wget "http://images.cocodataset.org/annotations/annotations_trainval2017.zip" -P $git_path
  !wget "https://download.01.org/opencv/openvino_training_extensions/models/human_pose_estimation/checkpoint_iter_370000.pth" -P $pre_model_path
  !mkdir coco && unzip "/content/lightweight-human-pose-estimation.pytorch/annotations_trainval2017.zip" -d "/content/coco/"

%cd $git_path

In [None]:
!pip install -r requirements.txt

In [None]:
import sys
sys.path.append('/content/lightweight-human-pose-estimation.pytorch')

import argparse

import cv2
import numpy as np
import torch

from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
from modules.pose import Pose, track_poses
from val import normalize, pad_width


class ImageReader(object):
    def __init__(self, file_names):
        self.file_names = file_names
        self.max_idx = len(file_names)

    def __iter__(self):
        self.idx = 0
        return self

    def __next__(self):
        if self.idx == self.max_idx:
            raise StopIteration
        img = cv2.imread(self.file_names[self.idx], cv2.IMREAD_COLOR)
        if img.size == 0:
            raise IOError('Image {} cannot be read'.format(self.file_names[self.idx]))
        self.idx = self.idx + 1
        return img


class VideoReader(object):
    def __init__(self, file_name):
        self.file_name = file_name
        try:  # OpenCV needs int to read from webcam
            self.file_name = int(file_name)
        except ValueError:
            pass

    def __iter__(self):
        self.cap = cv2.VideoCapture(self.file_name)
        if not self.cap.isOpened():
            raise IOError('Video {} cannot be opened'.format(self.file_name))
        return self

    def __next__(self):
        was_read, img = self.cap.read()
        if not was_read:
            raise StopIteration
        return img




def infer_fast(net, img, net_input_height_size, stride, upsample_ratio, cpu,
               pad_value=(0, 0, 0), img_mean=(128, 128, 128), img_scale=1/256):
    height, width, _ = img.shape
    scale = net_input_height_size / height

    scaled_img = cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
    scaled_img = normalize(scaled_img, img_mean, img_scale)
    min_dims = [net_input_height_size, max(scaled_img.shape[1], net_input_height_size)]
    padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)

    tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float().to('cuda')  # Move directly to CUDA
    if not cpu:
      tensor_img = tensor_img.cuda()
      net.cuda()

    # Replace '+' with a forward pass through the network
    stages_output = net(tensor_img)

    stage2_heatmaps = stages_output[-2]
    heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
    heatmaps = cv2.resize(heatmaps, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    stage2_pafs = stages_output[-1]
    pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
    pafs = cv2.resize(pafs, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    return heatmaps, pafs, scale, pad

# Main

In [None]:
# @title
import cv2
import numpy as np
import torch
keypoint_connections =[
    [1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [1, 11],
    [11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17], [2, 16], [5, 17]
]
object_tracker = {}  # Global dictionary to store IDs and boxes
next_id = 0  # Counter for assigning unique IDs

def process_frame(frame, prev_detections, model, classes, net, height_size, stride, upsample_ratio, cpu, track, smooth, iou_threshold=0.1):
    global object_tracker, next_id

    # Detect pedestrians in the current frame (YOLO detection)
    boxes, indices = detect_pedestrians_in_frame(frame, model, classes)

    # Track objects using IoU
    if prev_detections is not None:
        matches, unmatched_prev, unmatched_curr = track_objects(prev_detections, boxes, iou_threshold)

        # Update positions of matched IDs
        for prev_idx, curr_idx in matches:
            for obj_id, prev_box in object_tracker.items():
                if prev_box == prev_detections[prev_idx]:  # Find corresponding ID
                    object_tracker[obj_id] = boxes[curr_idx]
                    break

        # Assign new IDs to unmatched current boxes
        for curr_idx in unmatched_curr:
            object_tracker[next_id] = boxes[curr_idx]
            next_id += 1

        # Remove IDs for unmatched previous boxes
        for prev_idx in unmatched_prev:
            for obj_id, prev_box in list(object_tracker.items()):
                if prev_box == prev_detections[prev_idx]:
                    del object_tracker[obj_id]
                    break
    else:
        # First frame: assign IDs to all detections
        for box in boxes:
            object_tracker[next_id] = box
            next_id += 1

    # Now process each tracked person and send ROI to OpenPose
    current_poses = []  # To store poses for each person

    for obj_id, (x, y, w, h) in object_tracker.items():
        #print(f"Object ID: {obj_id}, Bounding Box: x={x}, y={y}, w={w}, h={h}")
        # Crop the frame to get the ROI of the person
        roi = frame[int(y):int(y + h), int(x):int(x + w)]

    # Convert ROI to a PyTorch tensor (if necessary) and move to GPU
        roi_tensor = torch.from_numpy(roi).float().to('cuda')

    # Convert back to NumPy array before passing to OpenCV
        roi = roi_tensor.cpu().numpy()

    # Send the cropped ROI to OpenPose for keypoint detection
        heatmaps, pafs, scale, pad = infer_fast(net, roi, height_size, stride, upsample_ratio, cpu)

        # Post-process the keypoints (similar to your OpenPose post-processing)
        all_keypoints_by_type = []
        total_keypoints_num = 0
        num_keypoints = 18
        for kpt_idx in range(num_keypoints):  # 19th for bg
            total_keypoints_num += extract_keypoints(heatmaps[:, :, kpt_idx], all_keypoints_by_type, total_keypoints_num)

        pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05)

        # Adjust the coordinates of the keypoints based on the original frame
        for kpt_id in range(all_keypoints.shape[0]):
            all_keypoints[kpt_id, 0] = (all_keypoints[kpt_id, 0] * stride / upsample_ratio - pad[1]) / scale
            all_keypoints[kpt_id, 1] = (all_keypoints[kpt_id, 1] * stride / upsample_ratio - pad[0]) / scale
           # print(f"Keypoint {kpt_id}: x={all_keypoints[kpt_id, 0]:.2f}, y={all_keypoints[kpt_id, 1]:.2f}")

        # Store poses for later processing
        for n in range(len(pose_entries)):
            if len(pose_entries[n]) == 0:
                continue
            pose_keypoints = np.ones((num_keypoints, 2), dtype=np.int32) * -1
            for kpt_id in range(num_keypoints):
                if pose_entries[n][kpt_id] != -1.0:  # Keypoint was found
                    pose_keypoints[kpt_id, 0] = int(all_keypoints[int(pose_entries[n][kpt_id]), 0])
                    pose_keypoints[kpt_id, 1] = int(all_keypoints[int(pose_entries[n][kpt_id]), 1])
            pose = Pose(pose_keypoints, pose_entries[n][18])
            current_poses.append(pose)

    # Draw bounding boxes and IDs
    for obj_id, (x, y, w, h) in object_tracker.items():
        color = (0, 255, 0)  # Green color for the bounding box
        cv2.rectangle(frame, (int(x), int(y)), (int(x + w), int(y + h)), color, 2)
        cv2.putText(frame, f"ID {obj_id}", (int(x), int(y - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    for i, pose in enumerate(current_poses):
        pose.id = i
    # Draw poses inside bounding boxes
    for pose in current_poses:
        # Find the corresponding bounding box for each pose by matching object_id
        for obj_id, (x, y, w, h) in object_tracker.items():
            # Ensure poses are drawn inside the bounding box
            #if pose.id == obj_id:  # Match pose to the correct object ID
                scaled_keypoints = pose.keypoints.copy()
                if pose.id == obj_id:  # Only process the pose corresponding to the current object_id
                    scaled_keypoints = pose.keypoints.copy()
                # Scale and offset the keypoints to Assumingfit inside the bounding box
                    for kpt in scaled_keypoints:
                        if kpt[0] == -1 or kpt[1] == -1:
                            continue  # Skip invalid keypoints
                       # print(pose.id)
                       # print(obj_id)
    # Scale each keypoint to fit within the bounding box
                        #print(f"[Before] x={kpt[0]}, y={kpt[1]}")
                        kpt[0] = int(x + kpt[0] )  # Rescale X-coordinate and apply x offset (bounding box x)
                        kpt[1] = int(y + kpt[1] )  # Rescale Y-coordinate and apply y offset (bounding box y)

    # Clamp keypoints to ensure they don't go out of bounds
                        #kpt[0] =  min(frame.shape[1] - 1, kpt[0])#kpt[0] = max(0, min(frame.shape[1] - 1, kpt[0]))
                        #kpt[1] =  min(frame.shape[0] - 1, kpt[1])#kpt[1] = max(0, min(frame.shape[0] - 1, kpt[1]))
                        #print(f"[after] x={kpt[0]}, y={kpt[1]}")
                        #print(f"[BB] Object ID: {obj_id}, Bounding Box: x={x}, y={y}, w={w}, h={h}")
                      # Red keypoints

                        cv2.circle(frame, (kpt[0], kpt[1]), 5, (0, 0, 255), -1)
                    for (start, end) in keypoint_connections:
                        start_kpt = scaled_keypoints[start]
                        end_kpt = scaled_keypoints[end]

                # Ensure the keypoints exist
                        if start_kpt[0] >= 0 and start_kpt[1] >= 0 and end_kpt[0] >= 0 and end_kpt[1] >= 0:
                    # Draw a line between the keypoints
                                cv2.line(frame, (start_kpt[0], start_kpt[1]), (end_kpt[0], end_kpt[1]), (0, 255, 0), 2)  # Green lines
                # Draw the pose inside the bounding box
                #pose.draw(frame)  #  pose.draw() correctly handles the drawing

    # Return the frame with the keypoints drawn inside the bounding boxes
    return frame, list(object_tracker.values())



def display_video_with_tracking(video_path, output_path, model, classes, net, cpu=True, height_size=256, num_keypoints=18, track=True, smooth=True, iou_threshold=0.1):
    cap = cv2.VideoCapture(video_path)  # Open the video file

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Initialize the VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # MP4 codec
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    prev_detections = None  # Initialize previous detections as None for the first frame

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Process the frame with detection, tracking, and pose estimation
        frame, prev_detections = process_frame(
            frame,
            prev_detections,
            model,
            classes,
            net,
            height_size=height_size,
            stride=8,  # Add stride value from OpenPose logic
            upsample_ratio=4,  # Add upsample ratio value from OpenPose logic
            cpu=cpu,
            track=track,
            smooth=smooth,
            iou_threshold=iou_threshold
        )

        # Write the processed frame to the output video
        out.write(frame)

    # Release the video capture and writer objects
    cap.release()
    out.release()
if __name__ == "__main__":
    # Define paths and load the models
    video_path = "/content/Input.mp4"
    cfg_path = "/content/YOLOV3_Files/yolov3.cfg"
    weights_path = "/content/YOLOV3_Files/yolov3.weights"
    names_path = "/content/YOLOV3_Files/coco.names"
    output_path = "/content/MK_output.mp4"
    checkpoint_path = "/content/lightweight-human-pose-estimation.pytorch/pre_model/checkpoint_iter_370000.pth"

    # Load YOLO model
    model, classes = load_yolov3_model(cfg_path, weights_path, names_path)

    # Load Pose Estimation model (OpenPose)
    net = PoseEstimationWithMobileNet()
    # After initializing PoseEstimationWithMobileNet
    net = PoseEstimationWithMobileNet().to('cuda')
    print(next(net.parameters()).device) # Move to GPU

    checkpoint = torch.load(checkpoint_path)
    load_state(net, checkpoint)

    # Display the video with detection, tracking, and pose estimation
    display_video_with_tracking(video_path, output_path, model, classes, net, cpu=False, height_size=256, num_keypoints=18, track=True, smooth=True, iou_threshold=0.1)
