# Pre-requisites


Installing required packages


In [1]:
%pip install 'git+https://github.com/facebookresearch/segment-anything.git'
%pip install -q roboflow supervision
!wget -q 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth'
%pip install mediapipe
%pip install ultralytics
%pip install zip

Collecting git+https://github.com/facebookresearch/segment-anything.git
  Cloning https://github.com/facebookresearch/segment-anything.git to /tmp/pip-req-build-ueai3_w8
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/segment-anything.git /tmp/pip-req-build-ueai3_w8
  Resolved https://github.com/facebookresearch/segment-anything.git to commit dca509fe793f601edb92606367a655c15ac00fdf
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: segment_anything
  Building wheel for segment_anything (setup.py) ... [?25l[?25hdone
  Created wheel for segment_anything: filename=segment_anything-1.0-py3-none-any.whl size=36592 sha256=359591c2d9f8715ae10bc3402de15754cf07934fed240b8228c45bbe3c7fca89
  Stored in directory: /tmp/pip-ephem-wheel-cache-0kb4_kpo/wheels/10/cf/59/9ccb2f0a1bcc81d4fbd0e501680b5d088d690c6cfbc02dc99d
Successfully built segment_anything
Installing collected packages: segment_anything
Successfully 

Importing all the required libraries


In [2]:
import os
import cv2
import torch
import numpy as np
from PIL import Image
from ultralytics import YOLO
import supervision as sv
from segment_anything import sam_model_registry, SamPredictor, SamAutomaticMaskGenerator
from google.colab.patches import cv2_imshow
import mediapipe as mp
import csv


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


# Creating frames to use


Creating image frames from video input

In [3]:
def video_to_frames(input_video_path, output_folder, fps=20):
    # Open the video file
    cap = cv2.VideoCapture(input_video_path)

    # Get the original frames per second (fps) of the video
    original_fps = cap.get(cv2.CAP_PROP_FPS)

    # Get the total number of frames in the video
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(original_fps)
    print(total_frames)
    # Calculate the interval between frames based on the desired fps
    interval = int(original_fps / fps)
    print(interval)
    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    frame_number = 0
    saved_frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            break  # Break if the video ends

        # Save frames at the specified interval
        if frame_number % interval == 0:
            frame_filename = os.path.join(output_folder, f"frame_{saved_frame_count:04d}.jpg")
            cv2.imwrite(frame_filename, frame)  # Save frame as image
            saved_frame_count += 1

        frame_number += 1

    cap.release()  # Release the video capture object
    print(f"Extracted {saved_frame_count} frames.")



In [4]:

input_video_path = 'input_video.mp4'
output_folder = 'Image_frames'
fps = 15

video_to_frames(input_video_path, output_folder, fps)


30.0
1187
2
Extracted 594 frames.


# Task - 1

Loading YOLOv8 and SAM model

In [5]:

# Load YOLOv8 model (for human detection)
model = YOLO("yolov8l.pt")

# Load SAM model
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
MODEL_TYPE = "vit_h"
sam = sam_model_registry[MODEL_TYPE](checkpoint='sam_vit_h_4b8939.pth')
sam.to(device=DEVICE)
mask_predictor = SamPredictor(sam)


Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8l.pt to 'yolov8l.pt'...


100%|██████████| 83.7M/83.7M [00:00<00:00, 342MB/s]


Defining the input and output directories

In [6]:
# Define input and output directories
input_dir = 'Image_frames'  # Directory containing input images
output_dir = 'Segmented_frames'  # Directory to save segmented images

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)


Funtion to execute YOLO and SAM Segementation

In [7]:

# Process each image in the input directory
for image_file in os.listdir(input_dir):
    if image_file.endswith(('.png', '.jpg', '.jpeg')):
        image_path = os.path.join(input_dir, image_file)
        print(image_path)
        img = Image.open(image_path)

        # Convert to OpenCV format
        img_cv = np.array(img)
        img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)

        # Apply YOLOv8 to detect persons in the image
        results = model(img_cv)  # Run YOLOv8 on the image

        # Extract detection results
        boxes = results[0].boxes  # Get bounding boxes from the first result
        confidences = boxes.conf  # Confidence scores of detections

        # List to store bounding box coordinates for detected persons
        person_boxes = []
        for box, conf in zip(boxes.xywh, confidences):
            if conf > 0:
                x, y, w, h = box  # Extract box coordinates (x, y, width, height)
                x1, y1, x2, y2 = int(x - w / 2), int(y - h / 2), int(x + w / 2), int(y + h / 2)
                person_boxes.append((x1, y1, x2, y2))

        all_masks = []

        # Process each detected person box

        if person_boxes:
          box = person_boxes[0]
          box_np = np.array([
              box[0],  # x1
              box[1],  # y1
              box[2],  # x2
              box[3]   # y2
          ])

          # Set the image for the mask predictor
          mask_predictor.set_image(cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB))

          # Perform segmentation using the bounding box
          masks, scores, logits = mask_predictor.predict(
              box=box_np,
              multimask_output=True
          )

          # Append the masks to the all_masks list
          all_masks.extend(masks)

        # Combine all masks into a single mask
        if all_masks:
            combined_mask = np.sum(all_masks, axis=0) > 0
        else:
            combined_mask = np.zeros_like(img_cv[:, :, 0], dtype=bool)

        # Create a transparent overlay
        overlay = img_cv.copy()
        overlay[combined_mask] = [0, 255, 0]

        # Create an alpha channel for transparency
        alpha = 0.5
        transparent_mask = cv2.addWeighted(overlay, alpha, img_cv, 1 - alpha, 0)

        # Save the transparent mask image
        output_path = os.path.join(output_dir, f'{image_file}')
        cv2.imwrite(output_path, transparent_mask)  # Save the image

Image_frames/frame_0060.jpg

0: 384x640 1 person, 117.9ms
Speed: 14.1ms preprocess, 117.9ms inference, 771.4ms postprocess per image at shape (1, 3, 384, 640)
Image_frames/test_image.jpg

0: 320x640 1 person, 63.6ms
Speed: 3.2ms preprocess, 63.6ms inference, 1.5ms postprocess per image at shape (1, 3, 320, 640)


Saving the segemented frames into zip file for future use

In [8]:
!zip -r Segmented_frames.zip Segmented_frames/

  adding: Segmented_frames/ (stored 0%)
  adding: Segmented_frames/frame_0060.jpg (deflated 0%)
  adding: Segmented_frames/test_image.jpg (deflated 2%)


Function to create segmented video from the segmented frames

In [9]:
def frames_to_video(output_folder, output_video_path, fps=20):
    # Get all the frame filenames in sorted order
    frame_files = sorted([f for f in os.listdir(output_folder) if f.endswith('.jpg')])

    # Read the first frame to get dimensions
    first_frame = cv2.imread(os.path.join(output_folder, frame_files[0]))
    height, width, _ = first_frame.shape

    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # You can use other codecs too
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Write frames to video
    for frame_file in frame_files:
        frame_path = os.path.join(output_folder, frame_file)
        frame = cv2.imread(frame_path)
        out.write(frame)

    out.release()
    print(f"Video saved to {output_video_path}")


output_video_path = 'segmented_video.mp4'
frames_to_video(output_dir, output_video_path, fps=15)


Video saved to segmented_video.mp4


# Task - 2

Loading Yolo


In [10]:

# 1. Load YOLOv8 model (for human detection)
model = YOLO("yolov8l.pt")

# 2. Load Pose Estimation Model (using MediaPipe)
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# 3. Define the folder containing the images
pose_folder = 'Poses'  # Folder to save pose estimated images


Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 97.4MB/s]


creating csv file for key point use

In [11]:

# Make sure the "poses" folder exists
os.makedirs(pose_folder, exist_ok=True)

# Define the CSV file to store keypoints
csv_file = 'pose_keypoints.csv'

# Create or overwrite CSV file and write headers
with open(csv_file, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['Image Name', 'Keypoint Index', 'X', 'Y', 'Visibility'])


Pose estimation and using YOLO and mediaPipe, also saving the keypoints coordinates in the csv file

In [12]:

# 4. Loop over all images in the folder
for image_name in os.listdir(input_dir):
    if image_name.endswith(('.jpg', '.jpeg', '.png')):
        # Load the image
        img_path = os.path.join(input_dir, image_name)
        img = Image.open(img_path)

        # Convert to OpenCV format
        img_cv = np.array(img)
        img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)

        # 5. Preprocessing: Apply Gaussian blur or median filter to reduce noise from water bubbles
        img_blur = cv2.GaussianBlur(img_cv, (5, 5), 0)

        # 6. Apply YOLOv8 to detect swimmers (humans) in the image
        results = model(img_blur)

        # Extract detection results
        boxes = results[0].boxes  # Get bounding boxes from the first result
        labels = results[0].names  # Get class names from the model
        confidences = boxes.conf  # Confidence scores of detections

        # 7. Draw bounding boxes on the image if confidence > threshold
        for box, conf in zip(boxes.xywh, confidences):
            if conf > 0:
                x, y, w, h = box  # Extract box coordinates (x, y, width, height)
                x1, y1, x2, y2 = int(x - w / 2), int(y - h / 2), int(x + w / 2), int(y + h / 2)

                # Draw a rectangle around the detected object
                cv2.rectangle(img_cv, (x1, y1), (x2, y2), (255, 0, 0), 2)

                # Annotate with the class label and confidence score
                label = f'{labels[int(boxes.cls[0])]}: {conf:.2f}'
                cv2.putText(img_cv, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

        # 8. Pose estimation using MediaPipe to detect keypoints of the swimmer
        image_rgb = cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB)

        # Perform pose estimation
        results_pose = pose.process(image_rgb)

        # 9. Extract and store keypoint coordinates, and draw keypoints on the image
        if results_pose.pose_landmarks:
            for idx, landmark in enumerate(results_pose.pose_landmarks.landmark):
                x = landmark.x  # X coordinate (normalized to [0, 1])
                y = landmark.y  # Y coordinate (normalized to [0, 1])
                visibility = landmark.visibility  # Visibility score

                # Convert normalized coordinates to pixel values
                h, w, _ = img_cv.shape
                x_pixel = int(x * w)
                y_pixel = int(y * h)

                # Draw keypoints on the image (in red)
                if visibility > 0:
                    cv2.circle(img_cv, (x_pixel, y_pixel), 5, (0, 0, 255), -1)

                # Write the keypoints to the CSV file
                with open(csv_file, 'a', newline='') as f:
                    writer = csv.writer(f)
                    writer.writerow([image_name, idx, x, y, visibility])

        # 10. Draw Pose Connections on the image (connecting keypoints)
        if results_pose.pose_landmarks:
            mp.solutions.drawing_utils.draw_landmarks(img_cv, results_pose.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        # 11. Save the final image with both bounding boxes (from YOLOv8) and pose keypoints to the "poses" folder
        pose_image_path = os.path.join(pose_folder, image_name)
        cv2.imwrite(pose_image_path, img_cv)




0: 384x640 (no detections), 89.9ms
Speed: 1.5ms preprocess, 89.9ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 1 person, 32.4ms
Speed: 3.5ms preprocess, 32.4ms inference, 1.2ms postprocess per image at shape (1, 3, 320, 640)


Saving the estimated frames in a zip file for future use


In [15]:
!zip -r Poses_frames.zip Poses/

  adding: Poses/ (stored 0%)
  adding: Poses/frame_0060.jpg (deflated 0%)
  adding: Poses/test_image.jpg (deflated 2%)


Converting the estimated frames to video

In [14]:
def frames_to_video(output_folder, output_video_path, fps=20):
    # Get all the frame filenames in sorted order
    frame_files = sorted([f for f in os.listdir(output_folder) if f.endswith('.jpg')])

    # Read the first frame to get dimensions
    first_frame = cv2.imread(os.path.join(output_folder, frame_files[0]))
    height, width, _ = first_frame.shape

    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # You can use other codecs too
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Write frames to video
    for frame_file in frame_files:
        frame_path = os.path.join(output_folder, frame_file)
        frame = cv2.imread(frame_path)
        out.write(frame)

    out.release()
    print(f"Video saved to {output_video_path}")


output_video_path = 'pose_video.mp4'
frames_to_video(pose_folder, output_video_path, fps=15)


Video saved to pose_video.mp4
