Check the GPU drivers and CUDA are available for better performance

In [None]:
!nvidia-smi

In [None]:
import torch

print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU name:", torch.cuda.get_device_name(0))
else:
    print("No GPU detected. Please check your driver/CUDA setup.")

Install required libraries

In [None]:
!pip install tensorflow==2.17.0
!pip install autodistill
!pip install autodistill-grounded-sam
!pip install autodistill-yolov8
!pip install supervision==0.24.0
!pip install scikit-learn
!pip install roboflow
!pip install opencv-python
!pip install ultralytics

Training video data

In [None]:
# Train video path (Multiple Video if needed)
TRAIN_VIDEO_PATHS = ["Train_Vid.mp4"]

# Interval for extracting every Nth frame from a video 
FRAME_STRIDE = 10

Folders names

In [None]:
# Folder path to save frames from video
IMAGE_DIR_PATH = "Images"

# Folder path to final dataset
DATASET_DIR_PATH = "Dataset"

# View samples variables
SAMPLE_SIZE = 25
SAMPLE_GRID_SIZE = (5, 5)
SAMPLE_PLOT_SIZE = (15, 15)

Cut the video to frames

In [None]:
import supervision as sv
from tqdm import tqdm
from pathlib import Path

for video_path in tqdm(TRAIN_VIDEO_PATHS):
    video_path = Path(video_path)
    video_name = video_path.stem
    image_name_pattern = video_name + "_{:01d}.png"
    with sv.ImageSink(target_dir_path=IMAGE_DIR_PATH, image_name_pattern=image_name_pattern) as sink:
        for image in sv.get_video_frames_generator(source_path=str(video_path), stride=FRAME_STRIDE):
            sink.save_image(image=image)

image_paths = sv.list_files_with_extensions(
    directory=IMAGE_DIR_PATH,
    extensions=["png", "jpg", "jpg"])

print('image count:', len(image_paths))

View sample of extracted frames

In [None]:
import cv2
import supervision as sv

titles = [
    image_path.stem
    for image_path
    in image_paths[:SAMPLE_SIZE]]
images = [
    cv2.imread(str(image_path))
    for image_path
    in image_paths[:SAMPLE_SIZE]]

sv.plot_images_grid(images=images, titles=titles, grid_size=SAMPLE_GRID_SIZE, size=SAMPLE_PLOT_SIZE)

Labeling initialization

In [None]:
from autodistill.detection import CaptionOntology

ontology=CaptionOntology({
    "person": "person",
    "bag": "bag",
    "cycle": "cycle"
})

Annotation and Labeling

In [None]:
from autodistill_grounded_sam import GroundedSAM

base_model = GroundedSAM(ontology=ontology)
dataset = base_model.label(
    input_folder=IMAGE_DIR_PATH,
    extension=".png",
    output_folder=DATASET_DIR_PATH)

Delete non-desired folders

In [None]:
import shutil
import os

folder_paths = [
    "Dataset/annotations",
    "Dataset/images",
    "Images"
]

for folder in folder_paths:
    if os.path.exists(folder):
        shutil.rmtree(folder)
        print(f"Deleted: {folder}")
    else:
        print(f"Not found: {folder}")


Delete non-labeled images

In [None]:
import os

def remove_empty_labels(base_path):
    splits = ["train", "valid"]
    for split in splits:
        labels_dir = os.path.join(base_path, split, "labels")
        images_dir = os.path.join(base_path, split, "images")

        for label_file in os.listdir(labels_dir):
            label_path = os.path.join(labels_dir, label_file)
            # Check if label file is empty 
            if os.path.getsize(label_path) == 0:
                image_file = os.path.splitext(label_file)[0] + ".jpg"  
                image_path_jpg = os.path.join(images_dir, image_file)
                image_file_png = os.path.splitext(label_file)[0] + ".png"
                image_path_png = os.path.join(images_dir, image_file_png)

                # Delete label file
                os.remove(label_path)
                print(f"Deleted empty label: {label_path}")

                # Delete corresponding image (try jpg, then png)
                if os.path.exists(image_path_jpg):
                    os.remove(image_path_jpg)
                    print(f"Deleted corresponding image: {image_path_jpg}")
                elif os.path.exists(image_path_png):
                    os.remove(image_path_png)
                    print(f"Deleted corresponding image: {image_path_png}")
                else:
                    print(f"⚠️ Corresponding image not found for label: {label_path}")

remove_empty_labels("Dataset")


Show sample for labeled train frames (Segmentation)

In [None]:
ANNOTATIONS_DIRECTORY_PATH = "Dataset/train/labels"
IMAGES_DIRECTORY_PATH = "Dataset/train/images"
DATA_YAML_PATH = "Dataset/data.yaml"

In [None]:
import supervision as sv
from pathlib import Path

dataset = sv.DetectionDataset.from_yolo(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_directory_path=ANNOTATIONS_DIRECTORY_PATH,
    data_yaml_path=DATA_YAML_PATH)

mask_annotator = sv.MaskAnnotator()
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()

images = []
image_names = []
for i, (image_path, image, annotation) in enumerate(dataset):
    if i == SAMPLE_SIZE:
        break
    annotated_image = image.copy()
    annotated_image = mask_annotator.annotate(
        scene=annotated_image, detections=annotation)
    annotated_image = box_annotator.annotate(
        scene=annotated_image, detections=annotation)
    annotated_image = label_annotator.annotate(
        scene=annotated_image, detections=annotation)

    image_names.append(Path(image_path).name)
    images.append(annotated_image)

sv.plot_images_grid(
    images=images,
    titles=image_names,
    grid_size=SAMPLE_GRID_SIZE,
    size=SAMPLE_PLOT_SIZE)

Split the dataset to Train, Valid, and Test   

In [None]:
import os
import random
import shutil

def create_test_split(base_path, test_ratio=0.1):
    train_images_dir = os.path.join(base_path, "train", "images")
    train_labels_dir = os.path.join(base_path, "train", "labels")
    test_images_dir = os.path.join(base_path, "test", "images")
    test_labels_dir = os.path.join(base_path, "test", "labels")

    os.makedirs(test_images_dir, exist_ok=True)
    os.makedirs(test_labels_dir, exist_ok=True)

    train_images = os.listdir(train_images_dir)
    num_test = int(len(train_images) * test_ratio)

    test_images = random.sample(train_images, num_test)

    for img_name in test_images:
        label_name = os.path.splitext(img_name)[0] + ".txt"

        # Source paths
        src_img = os.path.join(train_images_dir, img_name)
        src_lbl = os.path.join(train_labels_dir, label_name)

        # Destination paths
        dst_img = os.path.join(test_images_dir, img_name)
        dst_lbl = os.path.join(test_labels_dir, label_name)

        # Move image and label files
        shutil.move(src_img, dst_img)
        if os.path.exists(src_lbl):
            shutil.move(src_lbl, dst_lbl)
        else:
            print(f"Label file not found for image: {img_name}")

    print(f"Created test split with {num_test} samples moved from train to test.")

create_test_split("Dataset", test_ratio=0.1)

Modify .yaml file by adding test folder path

In [None]:
from pathlib import Path
import yaml

base_path = Path("Dataset")

# Load YAML file
yaml_path = base_path / "data.yaml"
with open(yaml_path, 'r') as file:
    data = yaml.safe_load(file)

test_path = (base_path / "test" / "images").resolve()
test_path_str = str(test_path)
dataset_index = test_path_str.find("Dataset")
if dataset_index != -1:
    prefix = test_path_str[:dataset_index]
    suffix = test_path_str[dataset_index:]
    suffix = suffix.replace("\\", "/")
    test_path_str = prefix + suffix

data['test'] = test_path_str

with open(yaml_path, 'w') as file:
    yaml.dump(data, file, default_flow_style=False)

print("Added 'test' path with mixed separators to YAML file.")


Convert labels to YOLO bounding box format

In [None]:
import os
from glob import glob

def polygon_to_bbox(coords):
    xs = coords[0::2]
    ys = coords[1::2]
    x_min, x_max = min(xs), max(xs)
    y_min, y_max = min(ys), max(ys)
    x_center = (x_min + x_max) / 2
    y_center = (y_min + y_max) / 2
    width = x_max - x_min
    height = y_max - y_min
    return x_center, y_center, width, height

def convert_labels_to_yolo_format(base_path):
    label_dirs = [os.path.join(base_path, split, "labels") for split in ["train", "valid", "test"]]
    for label_dir in label_dirs:
        if not os.path.exists(label_dir):
            print(f"Label directory does not exist: {label_dir}")
            continue
        
        label_files = glob(os.path.join(label_dir, "*.txt"))
        print(f"Processing {len(label_files)} labels in {label_dir}")

        for lbl_file in label_files:
            with open(lbl_file, "r") as f:
                lines = f.readlines()
            
            new_lines = []
            for line in lines:
                parts = line.strip().split()
                if len(parts) < 3 or len(parts) % 2 == 0:
                    print(f"Skipping malformed line in {lbl_file}: {line.strip()}")
                    continue
                class_id = parts[0]
                coords = list(map(float, parts[1:]))

                # Convert polygon to bounding box
                x_c, y_c, w, h = polygon_to_bbox(coords)
                new_line = f"{class_id} {x_c:.6f} {y_c:.6f} {w:.6f} {h:.6f}\n"
                new_lines.append(new_line)
            
            # Overwrite label file with YOLO bounding box format
            with open(lbl_file, "w") as f:
                f.writelines(new_lines)

    print("Conversion to YOLO bounding boxes completed.")

convert_labels_to_yolo_format("Dataset")


Show sample for labeled frames (YOLO Bounding Box)

In [None]:
import supervision as sv
from pathlib import Path

dataset = sv.DetectionDataset.from_yolo(
    images_directory_path=IMAGES_DIRECTORY_PATH,
    annotations_directory_path=ANNOTATIONS_DIRECTORY_PATH,
    data_yaml_path=DATA_YAML_PATH)

mask_annotator = sv.MaskAnnotator()
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()

images = []
image_names = []
for i, (image_path, image, annotation) in enumerate(dataset):
    if i == SAMPLE_SIZE:
        break
    annotated_image = image.copy()
    annotated_image = mask_annotator.annotate(
        scene=annotated_image, detections=annotation)
    annotated_image = box_annotator.annotate(
        scene=annotated_image, detections=annotation)
    annotated_image = label_annotator.annotate(
        scene=annotated_image, detections=annotation)

    image_names.append(Path(image_path).name)
    images.append(annotated_image)

sv.plot_images_grid(
    images=images,
    titles=image_names,
    grid_size=SAMPLE_GRID_SIZE,
    size=SAMPLE_PLOT_SIZE)

Save sample for labeled frames (YOLO Bounding Box)

In [None]:
import os
import random
import cv2
from glob import glob

def draw_yolo_boxes_on_image(image_path, label_path):
    img = cv2.imread(image_path)
    if img is None:
        print(f"Failed to read image: {image_path}")
        return None

    h, w, _ = img.shape
    if not os.path.exists(label_path):
        return img  

    with open(label_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue
            class_id, x_c, y_c, bw, bh = map(float, parts)
            x_center, y_center = int(x_c * w), int(y_c * h)
            box_w, box_h = int(bw * w), int(bh * h)
            x1 = int(x_center - box_w / 2)
            y1 = int(y_center - box_h / 2)
            x2 = int(x_center + box_w / 2)
            y2 = int(y_center + box_h / 2)
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(img, str(int(class_id)), (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    return img

def save_random_bbox_previews(base_path, output_dir="preview_sample", count=10):
    os.makedirs(output_dir, exist_ok=True)
    all_img_paths = []
    for split in ["train", "valid", "test"]:
        img_dir = os.path.join(base_path, split, "images")
        if os.path.exists(img_dir):
            all_img_paths.extend(glob(os.path.join(img_dir, "*.jpg")))

    if len(all_img_paths) == 0:
        print("No images found.")
        return

    sample_imgs = random.sample(all_img_paths, min(count, len(all_img_paths)))

    for i, img_path in enumerate(sample_imgs):
        label_path = img_path.replace("images", "labels").replace(".jpg", ".txt")
        preview_img = draw_yolo_boxes_on_image(img_path, label_path)
        if preview_img is not None:
            output_path = os.path.join(output_dir, f"preview_{i}.jpg")
            cv2.imwrite(output_path, preview_img)

    print(f"Saved {len(sample_imgs)} preview images with bounding boxes in: {output_dir}")

save_random_bbox_previews("Dataset")


Fine-tune a YOLO model using the annotated data

In [None]:
from ultralytics import YOLO
import torch

# Load a pretained YOLO model (yolov8n.pt - yolov8s.pt - yolov8m.pt - ...) as needed
model = YOLO("yolov8n.pt")

# Train the model with save_period set to 50
model.train(
    data="Dataset\\data.yaml",      # Path to your dataset configuration file
    epochs=50,                      # Total number of training epochs
    imgsz=360,                      # Image size
    batch=16,                       # Batch size
    name="Trained_Model",           # Name for the training run
    workers=0
)
