In [1]:
%%writefile modularized/setup_and_installation.py
import subprocess

def setup_and_install():
    # Install necessary packages
    packages = [
        "git+https://github.com/THU-MIG/yolov10.git",
        "supervision",
        "roboflow"
    ]

    for package in packages:
        subprocess.run(["pip", "install", package], check=True)

    print("Setup and installation complete.")


Overwriting modularized/setup_and_installation.py


In [2]:
%%writefile modularized/download_weights.py
import os
import requests

def download_weights():
    weights_dir = os.path.join("weights")
    os.makedirs(weights_dir, exist_ok=True)

    weights_urls = [
        "https://github.com/jameslahm/yolov10/releases/download/v1.0/yolov10n.pt",
        "https://github.com/jameslahm/yolov10/releases/download/v1.0/yolov10s.pt",
        "https://github.com/jameslahm/yolov10/releases/download/v1.0/yolov10m.pt",
        "https://github.com/jameslahm/yolov10/releases/download/v1.0/yolov10b.pt",
        "https://github.com/jameslahm/yolov10/releases/download/v1.0/yolov10x.pt",
        "https://github.com/jameslahm/yolov10/releases/download/v1.0/yolov10l.pt"
    ]

    def download_file(url, dest_folder):
        filename = os.path.join(dest_folder, url.split('/')[-1])
        if not os.path.exists(filename):
            with requests.get(url, stream=True) as r:
                r.raise_for_status()
                with open(filename, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=8192):
                        f.write(chunk)
            print(f"Downloaded: {filename}")
        else:
            print(f"File already exists: {filename}")

    for url in weights_urls:
        download_file(url, weights_dir)


Overwriting modularized/download_weights.py


In [3]:
%%writefile modularized/initialize_model.py
import torch
from ultralytics import YOLOv10
import os

def initialize_model(weights_dir):
    print(torch.__version__)
    print("CUDA available: ", torch.cuda.is_available())
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    model = YOLOv10(os.path.join(weights_dir, "yolov10n.pt"))
    print("yolov10 preset classes = ", model.names)
    return model, device


Overwriting modularized/initialize_model.py


In [4]:
%%writefile modularized/download_dataset.py
import yaml
from roboflow import Roboflow
import os

def download_dataset():
    rf = Roboflow(api_key="YOUR_API_KEY")
    project = rf.workspace("basketball-formations").project("basketball-and-hoop-7xk0h")
    version = project.version(11)
    dataset = version.download("yolov8")

    dataset_location = dataset.location
    data_yaml_path = os.path.join(dataset_location, "data.yaml")

    with open(data_yaml_path, 'r') as file:
        data_yaml = yaml.safe_load(file)
    data_yaml['train'] = '../train/images'
    data_yaml['val'] = '../valid/images'
    with open(data_yaml_path, 'w') as file:
        yaml.safe_dump(data_yaml, file)

    class_names = data_yaml['names']
    print("my roboflow classes = ", class_names)
    return dataset_location, data_yaml_path, class_names


Overwriting modularized/download_dataset.py


In [5]:
%%writefile modularized/inference_pretrained.py
import cv2
import supervision as sv

def inference_pretrained(model, image_path):
    image = cv2.imread(image_path)
    results = model(image)[0]
    detections = sv.Detections.from_ultralytics(results)

    bounding_box_annotator = sv.BoundingBoxAnnotator()
    label_annotator = sv.LabelAnnotator()
    annotated_image = bounding_box_annotator.annotate(scene=image, detections=detections)
    annotated_image = label_annotator.annotate(scene=annotated_image, detections=detections)
    sv.plot_image(annotated_image)


Overwriting modularized/inference_pretrained.py


In [6]:
%%writefile modularized/custom_training.py
import os
import shutil

def custom_training(weights_dir, data_yaml_path, device, epochs, batch_size):
    runs_dir = os.path.join("runs")
    if os.path.exists(runs_dir):
        shutil.rmtree(runs_dir)
        print(f"Removed previous run files in {runs_dir}")

    os.system(f'yolo task=detect mode=train epochs={epochs} batch={batch_size} plots=True model={os.path.join(weights_dir, "yolov10x.pt")} data={data_yaml_path} device={device}')


Overwriting modularized/custom_training.py


In [7]:
%%writefile modularized/post_training_results.py
import os
from IPython.display import display, Image

def post_training_results():
    results_dir = os.path.join("runs", "detect", "train")
    best_model_path = os.path.join(results_dir, "weights", "best.pt")
    if os.path.exists(results_dir):
        display(Image(filename=os.path.join(results_dir, "confusion_matrix.png"), width=600))
        display(Image(filename=os.path.join(results_dir, "results.png"), width=600))
        
        print("Trained model files:")
        for file_name in os.listdir(os.path.join(results_dir, "weights")):
            if file_name.endswith(".pt"):
                print(file_name)
    return best_model_path



Overwriting modularized/post_training_results.py


In [8]:
%%writefile modularized/download_video.py
import os
from pytube import YouTube
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
from moviepy.editor import VideoFileClip

def download_video(yt_video_url):
    HOME = os.getcwd()
    video_path = os.path.join(HOME, 'hq_luka.mp4')

    yt = YouTube(yt_video_url)

    print("Available streams:")
    for stream in yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution'):
        print(stream)

    stream = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    stream.download(output_path=HOME, filename='hq_luka.mp4')
    print(f"Downloaded video: {video_path}")

    video = VideoFileClip(video_path)
    duration = video.duration

    start_time = 174  # 2 minutes and 54 seconds
    output_path = os.path.join(HOME, 'hq_luka_cut.mp4')

    ffmpeg_extract_subclip(video_path, start_time, duration, targetname=output_path)
    print(f"Cut video saved to: {output_path}")
    return output_path



Overwriting modularized/download_video.py


In [9]:
%%writefile modularized/object_tracking_scoring.py
import os
import cv2
import supervision as sv
from ultralytics import YOLOv10
import numpy as np

def object_tracking_scoring(video_path, best_model_path):
    # Load the best trained model
    trained_model = YOLOv10(best_model_path)
    
    # Initialize trackers and annotators
    tracker = sv.ByteTrack()
    bounding_box_annotator = sv.BoundingBoxAnnotator()
    label_annotator = sv.LabelAnnotator()
    trace_annotator = sv.TraceAnnotator()

    # Initialize score and intersection tracking
    score = 0
    intersected_basketballs = set()

    # Function to check intersection
    def check_intersection(basketball_box, hoop_box):
        x1_b, y1_b, x2_b, y2_b = basketball_box
        x1_h, y1_h, x2_h, y2_h = hoop_box
        
        # Check if the boxes intersect
        if x1_b < x2_h and x2_b > x1_h and y1_b < y2_h and y2_b > y1_h:
            return True
        return False

    # Callback function for processing each frame
    def callback(frame: np.ndarray, _: int) -> np.ndarray:
        nonlocal score, intersected_basketballs
        results = trained_model(frame)
        print("Results type:", type(results))
        print("Results:", results)
        
        # Extract results from the list
        results = results[0]
        
        # Convert the results to Detections
        detections = sv.Detections.from_ultralytics(results)
        
        # Update detections with tracker
        detections = tracker.update_with_detections(detections)
        
        # Generate labels
        labels = [
            f"#{tracker_id} {trained_model.names[class_id]}"
            for class_id, tracker_id in zip(detections.class_id, detections.tracker_id)
        ]
        
        # Ensure that labels list matches the number of detections
        if len(labels) != len(detections):
            print(f"Warning: Number of labels ({len(labels)}) does not match number of detections ({len(detections)})")
            return frame  # Return the original frame if there's a mismatch

        # Get bounding boxes and tracker IDs for basketball and hoop
        basketball_boxes = [(box, tracker_id) for box, class_id, tracker_id in zip(detections.xyxy, detections.class_id, detections.tracker_id) if class_id == 1]
        hoop_boxes = [(box, tracker_id) for box, class_id, tracker_id in zip(detections.xyxy, detections.class_id, detections.tracker_id) if class_id == 2]
        
        # Check for intersection and update score
        for basketball_box, basketball_id in basketball_boxes:
            for hoop_box, hoop_id in hoop_boxes:
                if check_intersection(basketball_box, hoop_box):
                    if basketball_id not in intersected_basketballs:
                        score += 1
                        intersected_basketballs.add(basketball_id)

        # Annotate frame with bounding boxes and labels
        annotated_frame = bounding_box_annotator.annotate(frame.copy(), detections=detections)
        annotated_frame = label_annotator.annotate(annotated_frame, detections=detections, labels=labels)

        # Overlay the score on the frame
        cv2.putText(annotated_frame, f"Score: {score}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
        
        return trace_annotator.annotate(annotated_frame, detections=detections)

    # Process video with tracking and scoring using the updated callback
    sv.process_video(source_path=video_path, target_path=os.path.join("result.mp4"), callback=callback)


Overwriting modularized/object_tracking_scoring.py


In [10]:
%%writefile main.py
import argparse
from modularized.setup_and_installation import setup_and_install
from modularized.download_weights import download_weights
from modularized.initialize_model import initialize_model
from modularized.download_dataset import download_dataset
from modularized.inference_pretrained import inference_pretrained
from modularized.custom_training import custom_training
from modularized.post_training_results import post_training_results
from modularized.download_video import download_video
from modularized.object_tracking_scoring import object_tracking_scoring

def main(args):
    # Setup and Installation
    setup_and_install()
    
    # Download YOLOv10 Weights
    download_weights()
    
    # Initialize YOLOv10 Model
    weights_dir = "weights"
    model, device = initialize_model(weights_dir)
    
    # Download Dataset from Roboflow
    dataset_location, data_yaml_path, class_names = download_dataset()
    
    # Inference with Pre-trained Model
    inference_pretrained(model, args.image_path)
    
    # Custom Training
    custom_training(weights_dir, data_yaml_path, device, args.epochs, args.batch_size)
    
    # Post-training Results and Visualization
    best_model_path = post_training_results()
    
    # Download High-Quality Video from YouTube
    video_path = download_video(args.yt_video_url)
    
    # Object Tracking and Scoring
    object_tracking_scoring(video_path, best_model_path)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="YOLOv10 Training and Inference Pipeline")
    parser.add_argument("--epochs", type=int, default=25, help="Number of epochs for training")
    parser.add_argument("--batch_size", type=int, default=8, help="Batch size for training")
    parser.add_argument("--image_path", type=str, default="your_image.png", help="Path to the image for inference")
    parser.add_argument("--yt_video_url", type=str, default="https://www.youtube.com/watch?v=KyC6wr4I2VU", help="URL of the YouTube video for object tracking and scoring")
    args = parser.parse_args()
    main(args)



Writing main.py


In [11]:
#Example usage


!python main.py --epochs 5 --batch_size 16 --image_path path/to/your_image.png --yt_video_url https://www.youtube.com/watch?v=example_video_url


Collecting git+https://github.com/THU-MIG/yolov10.git
  Cloning https://github.com/THU-MIG/yolov10.git to c:\users\ghadf\appdata\local\temp\pip-req-build-k47tvxov
  Resolved https://github.com/THU-MIG/yolov10.git to commit d8777c1449509366ef3fa4892afab9b6f2880dbf
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Setup and installation complete.
File already exists: weights\yolov10n.pt
File already exists: weights\yolov10s.pt
File already exists: weights\yolov10m.pt
File already exists: weights\yolov10b.pt
File already exists: weights\yolov10x.pt
File already exists: weights\yolov10l.pt
2.3.1+cu121
CUDA available:  True
Using device: cuda
yolov10 preset classes =  {0: '0', 1: '1', 2: '2', 3: '3', 4: '4', 5: '5',

  Running command git clone --filter=blob:none --quiet https://github.com/THU-MIG/yolov10.git 'C:\Users\ghadf\AppData\Local\Temp\pip-req-build-k47tvxov'
Traceback (most recent call last):
  File "c:\Users\ghadf\vscode_projects\venv_projects\Pytorch\YOLO_exploration\main.py", line 48, in <module>
    main(args)
  File "c:\Users\ghadf\vscode_projects\venv_projects\Pytorch\YOLO_exploration\main.py", line 24, in main
    dataset_location, data_yaml_path, class_names = download_dataset()
                                                    ^^^^^^^^^^^^^^^^^^
  File "c:\Users\ghadf\vscode_projects\venv_projects\Pytorch\YOLO_exploration\modularized\download_dataset.py", line 7, in download_dataset
    project = rf.workspace("basketball-formations").project("basketball-and-hoop-7xk0h")
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\ghadf\vscode_projects\venv_projects\Pytorch\YOLO_exploration\.venv\Lib\site-packages\roboflow\__init__.py", line 259, in workspace
    list_pro