Requirements:
* must have roboflow dataset
* must use coco_segmentation 

Steps:
1) Load in CUDA
2) load in Pytorch documents
3) create coco custom image folder
4) load in roboflow image set
5) get classes out of it and the number of classes
6) load in the model (R-CNN)
7) Load in roboflow set into custom coco folder
8) input them into dataloaders
9) train model
10) save model
11) load model
12) process a video from youtube utilizing the model


In [2]:
import torch
from torch import nn

# Note: this notebook requires torch >= 1.10.0
print(torch.__version__)
print("CUDA available: ", torch.cuda.is_available())


# Setup device-agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

2.2.0+cu121
CUDA available:  True


'cuda'

In [3]:
import os

def create_directory(dir_path):
    """Create a directory if it does not exist."""
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)

#create going_modular repository
create_directory("going_modular")

In [4]:
# Download required files from torchvision
import requests

def download_files(urls):
    for url in urls:
        response = requests.get(url)
        if response.status_code == 200:
            with open(url.split("/")[-1], 'wb') as file:
                file.write(response.content)
        else:
            print(f"Failed to download {url}. Status code: {response.status_code}")

urls = [
    "https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py",
    "https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py",
    "https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py",
    "https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py",
    "https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py"
]
download_files(urls)

In [5]:
%%writefile going_modular/utils.py
#!pip install roboflow


#from roboflow import Roboflow
#rf = Roboflow(api_key="htpcxp3XQh7SsgMfjJns")
#project = rf.workspace("ai-79z1a").project("basketball_child")
#dataset = project.version(6).download("coco-segmentation")


from roboflow import Roboflow
import torch
import requests
import yt_dlp
import utils
import shutil
import os
import argparse
import json
from pathlib import Path
import torch

def download_videos_from_youtube(video_urls, output_path):
    """
    Downloads videos from YouTube.

    Args:
    video_urls (list): List of YouTube video URLs.
    output_path (str): Directory where videos will be saved.

    Returns:
    tuple: A tuple containing lists of successful and failed downloads.
    """

    ydl_opts = {
        'format': 'best',
        'outtmpl': output_path + '/%(title)s.%(ext)s',
        'quiet': True
    }

    failed_downloads = []
    successful_downloads = []

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        for url in video_urls:
            try:
                ydl.download([url])
                print(f"Successfully downloaded {url}")
                successful_downloads.append(url)
            except Exception as e:
                print(f"Failed to download {url}: {e}")
                failed_downloads.append(url)

    return successful_downloads, failed_downloads


def get_device():
    return "cuda" if torch.cuda.is_available() else "cpu"


def get_project(api_key, workspace, project_name, version):
    rf = Roboflow(api_key=api_key)
    project = rf.workspace(workspace).project(project_name)
    dataset = project.version(version).download("coco-segmentation")
    return dataset

def download_files(urls):
    for url in urls:
        response = requests.get(url)
        if response.status_code == 200:
            with open(url.split("/")[-1], 'wb') as file:
                file.write(response.content)
        else:
            print(f"Failed to download {url}. Status code: {response.status_code}")


def construct_dataset_paths(project_name, version):
    base_path = f"{project_name}-{version}"
    train_annotation_path = f"{base_path}/train/_annotations.coco.json"
    valid_annotation_path = f"{base_path}/valid/_annotations.coco.json"
    test_annotation_path = f"{base_path}/test/_annotations.coco.json"

    train_root_dir = f"{base_path}/train"
    valid_root_dir = f"{base_path}/valid"
    test_root_dir = f"{base_path}/test"

    return train_annotation_path, valid_annotation_path, test_annotation_path, train_root_dir, valid_root_dir, test_root_dir

def create_directory(dir_path):
    """Create a directory if it does not exist."""
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)



def split_dataset(dataset, split_ratio=0.8):
    total_size = len(dataset)
    train_size = int(total_size * split_ratio)
    valid_size = total_size - train_size
    train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size], generator=torch.Generator().manual_seed(42))
    return train_dataset, valid_dataset
        
def delete_folder_and_video(folder_path, video_path):
    """
    Delete the specified folder and video.
    
    Parameters:
    - folder_path: Path object or str, the path to the folder to delete.
    - video_path: Path object or str, the path to the video file to delete.
    """
    if folder_path.exists():
        shutil.rmtree(folder_path)
        print(f"Deleted folder: {folder_path}")
    else:
        print(f"Folder not found: {folder_path}")
    
    if video_path.exists():
        os.remove(video_path)
        print(f"Deleted video: {video_path}")
    else:
        print(f"Video not found: {video_path}")
        
def load_classes_from_json(file_path):
    """
    Loads the class names and their corresponding IDs from a COCO format JSON file.

    Args:
    file_path (str): Path to the JSON file.

    Returns:
    dict: A dictionary where keys are class IDs and values are class names.
    """
    with open(file_path) as f:
        data = json.load(f)
    categories = data['categories']
    classes = {category['id']: category['name'] for category in categories}
    return classes

Writing going_modular/utils.py


In [6]:
%%writefile going_modular/coco_dataset.py

import json
import os
import numpy as np
from PIL import Image, ImageDraw
import torch
from torchvision.transforms.v2 import functional as F
from torchvision import tv_tensors

class CustomCocoDataset(torch.utils.data.Dataset):
    def __init__(self, annotation_path, root_dir, transforms=None):
        self.root_dir = root_dir
        self.transforms = transforms

        with open(annotation_path) as f:
            self.annotations = json.load(f)

        # Filter out images without annotations
        annotated_images = []
        for img in self.annotations['images']:
            image_id = img['id']
            anns = [ann for ann in self.annotations['annotations'] if ann['image_id'] == image_id]
            if len(anns) > 0:
                annotated_images.append(img)

        self.image_ids = [img['id'] for img in annotated_images]

        # Update the self.annotations['images'] to include only annotated images
        self.annotations['images'] = annotated_images
        
        #print("Number of images:", len(self.annotations['images']))
        #print("Sample image entry:", self.annotations['images'][0])

    def __len__(self):
        return len(self.annotations['images'])

    def __getitem__(self, idx):
        img_info = self.annotations['images'][idx]
        image_id = img_info['id']
        
        img_path = os.path.join(self.root_dir, img_info['file_name'])
        img = Image.open(img_path).convert("RGB")
        img_tensor = F.to_tensor(img)
        #print("Image size (PIL):", img.size)
        #print("Image shape (tensor):", img_tensor.shape)

        anns = [ann for ann in self.annotations['annotations'] if ann['image_id'] == image_id]
        #print("Number of annotations for this image:", len(anns))

        boxes = [ann['bbox'] for ann in anns]  # bbox format: [x_min, y_min, width, height]
        # Convert from XYWH to XYXY format
        boxes = [[box[0], box[1], box[0] + box[2], box[1] + box[3]] for box in boxes]
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = [ann['category_id'] for ann in anns]
        labels = torch.as_tensor(labels, dtype=torch.int64)
        #print("Boxes shape:", boxes.shape)
        #print("Labels:", labels)
        # Debug print
        #print(f"Boxes shape for image {idx}: {boxes.shape}")

        masks = []
        for ann in anns:
            if 'segmentation' in ann and isinstance(ann['segmentation'], list):
                for seg in ann['segmentation']:
                    mask_img = Image.new('L', (img_info['width'], img_info['height']), 0)
                    ImageDraw.Draw(mask_img).polygon(seg, outline=1, fill=1)
                    mask = np.array(mask_img)
                    masks.append(mask)
        masks = torch.as_tensor(np.array(masks), dtype=torch.uint8) if masks else torch.zeros((0, img_info['height'], img_info['width']), dtype=torch.uint8)
        #print("Masks shape:", masks.shape)

        areas = [ann['area'] for ann in anns]
        areas = torch.as_tensor(areas, dtype=torch.float32)
        iscrowd = [ann['iscrowd'] for ann in anns]
        iscrowd = torch.as_tensor(iscrowd, dtype=torch.int64)

        # Convert masks to Mask format
        masks = tv_tensors.Mask(masks)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id  # Changed to integer
        target["area"] = areas
        target["iscrowd"] = iscrowd

        #print("Target:", target)

        if self.transforms is not None:
            img_tensor, target = self.transforms(img_tensor, target)

        return img_tensor, target





Writing going_modular/coco_dataset.py


In [7]:
%%writefile going_modular/visualization_utils.py
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F  # Add this import

# New function to visualize transformations
def visualize_transformation(dataset, idx):
    img, target = dataset[idx]
    transformed_img, transformed_target = dataset.transforms(img, target)
    original_img = F.to_pil_image(img)
    transformed_img = F.to_pil_image(transformed_img)

    plt.figure(figsize=(24, 6))
    # Original Image
    plt.subplot(1, 2, 1)
    plt.imshow(original_img)
    for box in target["boxes"]:
        x_min, y_min, x_max, y_max = box.tolist()
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, linewidth=2, edgecolor='r', facecolor='none')
        plt.gca().add_patch(rect)
        #print(x_min, y_min, x_max, y_max)
    plt.title(f"Original Image - ID: {idx}")

    # Transformed Image
    plt.subplot(1, 2, 2)
    plt.imshow(transformed_img)
    for box in transformed_target["boxes"]:
        x_min, y_min, x_max, y_max = box.tolist()
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, linewidth=2, edgecolor='b', facecolor='none')
        plt.gca().add_patch(rect)
        #print(x_min, y_min, x_max, y_max)
    plt.title(f"Transformed Image - ID: {idx}")
    plt.show()



def visualize_bbox(dataset, idx):
    img, target = dataset[idx]
    original_img = F.to_pil_image(img)

    plt.figure(figsize=(12, 6))
    plt.imshow(original_img)

    for box in target["boxes"]:  # Access the boxes directly
        x_min, y_min, x_max, y_max = box.tolist()
        # Debug print
        print(f"Visualizing BBox - xmin: {x_min}, ymin: {y_min}, xmax: {x_max}, ymax: {y_max}")
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, linewidth=2, edgecolor='r', facecolor='none')
        plt.gca().add_patch(rect)

    plt.title(f"Image with Bounding Boxes - ID: {idx}")
    plt.show()


Writing going_modular/visualization_utils.py


In [8]:
%%writefile going_modular/model_utils.py
import json
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
import utils
import shutil
import os

def load_classes_from_json(file_path):
    """
    Loads the class names and their corresponding IDs from a COCO format JSON file.

    Args:
    file_path (str): Path to the JSON file.

    Returns:
    dict: A dictionary where keys are class IDs and values are class names.
    """
    with open(file_path) as f:
        data = json.load(f)
    categories = data['categories']
    classes = {category['id']: category['name'] for category in categories}
    return classes



# Usage example:
#classes = load_classes_from_json('basketball_child-6/test/_annotations.coco.json')
#print(classes)

# model_utils.py
def get_model_instance_segmentation(num_classes, hidden_layer=256):
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights="DEFAULT")
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)
    return model




def upload_to_huggingface(model_directory, model_id):
    """Upload the model to Hugging Face Hub."""
    hf_api = HfApi()
    username = hf_api.whoami()['name']
    repo_name = f"{username}/{model_id}"
    repo_url = hf_api.create_repo(repo_name, exist_ok=True, private=False)

    repo = Repository(local_dir=model_directory, clone_from=repo_url, use_auth_token=True)
    repo.lfs_track(["*.bin", "*.pth", "*.ckpt"])  # Track large model files with Git LFS
    repo.git_add()
    repo.git_commit("Initial commit of the model")
    try:
        repo.git_push()
        print(f"Model successfully uploaded to: {repo_url}")
    except Exception as e:
        print(f"Failed to upload model to Hugging Face: {e}")
        


Writing going_modular/model_utils.py


In [9]:
%%writefile going_modular/transforms.py
import torch  # Add this import statement
from torchvision.transforms import v2 as T
from torchvision.transforms import Compose, RandomHorizontalFlip, ToTensor, ConvertImageDtype
from torchvision import transforms
from PIL import Image
from io import BytesIO

def get_transform(train):
    transforms = []
    #if train:
    #    transforms.append(T.RandomHorizontalFlip(0.5))
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(T.ToPureTensor())
    return T.Compose(transforms)

def transform_image(image_bytes):
    """
    Transforms image bytes into a tensor with the correct format for the model.
    
    Args:
    image_bytes (bytes): The image in bytes format, as uploaded by the user.
    
    Returns:
    torch.Tensor: The transformed image as a tensor.
    """
    # Define the transformations
    my_transforms = transforms.Compose([
        transforms.Resize((224, 224)),  # Resize to the size required by your model
        transforms.ToTensor(),  # Convert the image to a tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Standard normalization for pre-trained models
                             std=[0.229, 0.224, 0.225])
    ])
    
    # Load the image from bytes and apply transformations
    image = Image.open(BytesIO(image_bytes))
    return my_transforms(image).unsqueeze(0)  # Add a batch dimension


Writing going_modular/transforms.py


In [10]:
%%writefile going_modular/engine.py
# train.py
import torch
import torchvision
from engine import train_one_epoch, evaluate
from coco_utils import get_coco_api_from_dataset
from coco_eval import CocoEvaluator

def train_model(model, data_loader, data_loader_valid, device, num_epochs,
                lr=0.005, momentum=0.9, weight_decay=0.0005, step_size=3, gamma=0.1):
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=lr, momentum=momentum, weight_decay=weight_decay)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

    for epoch in range(num_epochs):
        train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
        lr_scheduler.step()
        evaluate(model, data_loader_valid, device=device)

    #torch.save(model.state_dict(), 'results/models/model_weights.pth')


Writing going_modular/engine.py


In [11]:
%%writefile going_modular/process_video_check.py

import cv2
import torch
import numpy as np
from torchvision.transforms import v2 as T
from torchvision.utils import draw_bounding_boxes, draw_segmentation_masks


def intersects(box1, box2):
    x1_min, y1_min, x1_max, y1_max = box1.tolist()
    x2_min, y2_min, x2_max, y2_max = box2.tolist()
    return (x1_min < x2_max and x1_max > x2_min and y1_min < y2_max and y1_max > y2_min)

def process_video_check(video_path, model, device, classes, classes_to_track=None, threshold=0.5, check_intersections=False):
    model.eval()
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print("Error opening video file")
        return

    score = 0  # Initialize the score counter

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_tensor = T.ToTensor()(frame).unsqueeze_(0).to(device)

        with torch.no_grad():
            prediction = model(frame_tensor)[0]

        pred_scores = prediction['scores']
        pred_boxes = prediction['boxes']
        pred_labels = prediction['labels']
        pred_masks = prediction['masks']

        keep = pred_scores > threshold
        pred_boxes = pred_boxes[keep]
        pred_labels = pred_labels[keep]
        pred_masks = pred_masks[keep]

        #print(f"Original Frame Size: {frame.shape}")
        #print(f"Pred Boxes before drawing: {pred_boxes}")

        if not keep.any():
            continue  # Skip this frame if no detections are kept

        # Convert numeric labels to class names
        pred_class_names = [classes[label.item()] for label in pred_labels]

        if check_intersections == True and classes_to_track:
            # Perform intersection checks only if enabled and classes_to_track is specified
            for class_pair in classes_to_track:
                class1_boxes = pred_boxes[[name == class_pair[0] for name in pred_class_names]]
                class2_boxes = pred_boxes[[name == class_pair[1] for name in pred_class_names]]

                for box1 in class1_boxes:
                    for box2 in class2_boxes:
                        if intersects(box1, box2):
                            score += 1
                            print(f"Intersection detected between {class_pair[0]} and {class_pair[1]}, Score:", score)

        # Frame Tensor Conversion for Drawing
        frame_tensor = (255.0 * (frame_tensor - frame_tensor.min()) / (frame_tensor.max() - frame_tensor.min())).to(torch.uint8)
        frame_tensor = frame_tensor.squeeze().to(torch.uint8)

        # Draw bounding boxes and segmentation masks
        output_image = draw_bounding_boxes(frame_tensor, pred_boxes, labels=pred_class_names, colors="red")
        output_image = draw_segmentation_masks(output_image, (pred_masks > 0.7).squeeze(1), alpha=0.5, colors="blue")

        # Convert output image for displaying
        output_image = output_image.permute(1, 2, 0).cpu().numpy().astype(np.uint8)
        output_image = np.clip(output_image, 0, 255)  # Ensure values are within 0-255
        if check_intersections == True and classes_to_track:
            # Draw score text
            cv2.putText(output_image, f'Score: {score}', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)

        cv2.imshow('Frame', output_image)
        
        # Just before cv2.imshow
        #print(f"Output Image Size: {output_image.shape}")

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# Example usage
# process_video_check(video_path, model, device, classes, [('ball', 'rim')], threshold=0.5)


Writing going_modular/process_video_check.py


In [12]:
%%writefile train.py
import argparse
import json
from pathlib import Path
import torch
from torch.utils.data import random_split, DataLoader
from going_modular.utils import (get_device, create_directory 
                                ,get_project, download_videos_from_youtube
                                , delete_folder_and_video, load_classes_from_json
                                ,split_dataset)
from going_modular.coco_dataset import CustomCocoDataset
from going_modular.model_utils import (get_model_instance_segmentation
                                        , upload_to_huggingface)
from going_modular.engine import train_model
from going_modular.transforms import get_transform
from going_modular.process_video_check import process_video_check
import utils
import shutil
import os



def main(args):
    data_path = Path(args.data_path)
    model_path = Path(args.model_path)
    create_directory(data_path)
    create_directory(model_path)

    # Check if the project data is already downloaded
    project_folder = Path(f'{args.project_folder_name}-{args.version}')
    classes_path = project_folder / 'train' / '_annotations.coco.json'

    if not project_folder.exists() or not classes_path.exists():
        print("Downloading project data...")
        get_project(args.api_key, args.workspace, args.project_name, args.version)

    # Load classes from JSON
    classes = load_classes_from_json(classes_path)
    print("Classes loaded:", classes)

    num_classes = len(classes) + 1
    device = get_device()
    model = get_model_instance_segmentation(num_classes, hidden_layer=args.hidden_layer)
    model.to(device)

    if args.mode == 'train':
        video_filename = args.video_name if args.video_name.endswith('.mp4') else f"{args.video_name}.mp4"
        video_path = Path(args.data_path) / video_filename
        # Load datasets
        datasets = {}
        data_loaders = {}
        for dtype in ['train', 'valid', 'test']:
            ann_path = project_folder / dtype / '_annotations.coco.json'
            img_dir = project_folder / dtype
            if ann_path.exists() and img_dir.exists():
                datasets[dtype] = CustomCocoDataset(str(ann_path), str(img_dir), transforms=get_transform(train=dtype=='train'))
                batch_size = 2 if dtype == 'train' else 1
                data_loaders[dtype] = DataLoader(datasets[dtype], batch_size=batch_size, shuffle=dtype=='train', num_workers=0, collate_fn=utils.collate_fn)
                print(f"{dtype.capitalize()} dataset loaded.")
            else:
                print(f"{dtype.capitalize()} dataset not found or incomplete. Skipping.")

        if 'train' in datasets and 'valid' not in datasets:
            print("Splitting dataset into train and valid...")
            train_dataset, valid_dataset = split_dataset(datasets['train'])
            data_loaders['train'] = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=0, collate_fn=utils.collate_fn)
            data_loaders['valid'] = DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0, collate_fn=utils.collate_fn)

        if 'train' in data_loaders and 'valid' in data_loaders:
            print("Starting training process...")
            train_model(model, data_loaders['train'], data_loaders.get('valid'), device, args.num_epochs, lr=args.lr)
            model_file_path = model_path / 'model_weights.pth'
            torch.save(model.state_dict(), model_file_path)
            print(f"Model saved at: {model_file_path}")
        else:
            print("Training failed. No valid data loaders available.")
            
        if args.delete_folder_and_video == True:
            delete_folder_and_video(project_folder, video_path)

    # Process video if mode is 'process_video'
    elif args.mode == 'process_video':
        video_filename = args.video_name if args.video_name.endswith('.mp4') else f"{args.video_name}.mp4"
        video_path = Path(args.data_path) / video_filename
        if not video_path.exists():
            print("Downloading video...")
            download_videos_from_youtube([args.video_url], str(Path(args.data_path)))

        if video_path.exists():
            model_file_path = model_path / 'model_weights.pth'
            if model_file_path.exists():
                model.load_state_dict(torch.load(str(model_file_path), map_location=device))
                classes_to_track = list(zip(args.classes_to_track[::2], args.classes_to_track[1::2]))  # Convert flat list to list of tuples
                process_video_check(video_path, model, device, classes, classes_to_track, args.threshold, args.check_intersections)
            else:
                print("Model weights file not found. Please train the model first.")
        if args.delete_folder_and_video == True:
            delete_folder_and_video(project_folder, video_path)
            
    elif args.mode == 'hf_upload':
        hf_login()  # Ensure user is logged in

        # Automatically determine model directory (or you can still ask the user)
        model_directory = args.model_path  # Assuming this is where your model is saved
        if not Path(model_directory).exists():
            print(f"Model directory {model_directory} does not exist. Please specify a valid model directory.")
            return

        model_id = input("Enter a name for your model on Hugging Face (e.g., my-cool-model): ")
        try:
            upload_to_huggingface(model_directory, model_id)
        except Exception as e:
            print(f"An error occurred during model upload: {e}")
            
        if args.delete_folder_and_video == True:
            delete_folder_and_video(project_folder, video_path)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Train a model for object detection or process video")
    parser.add_argument('--api_key', type=str, default="your_roboflow_api_key", help='API key for Roboflow')
    parser.add_argument('--workspace', type=str, default="your_roboflow_workspace", help='Workspace name in Roboflow')
    parser.add_argument('--project_name', type=str, default="your_roboflow_project", help='Project name in Roboflow')
    parser.add_argument('--project_folder_name', type=str, default="your_roboflow_project_folder_name", help='Project folder name in Roboflow')
    parser.add_argument('--version', type=int, default=1, help='Version of the dataset in Roboflow')
    parser.add_argument('--hidden_layer', type=int, default=256, help='Hidden layer size for the MaskRCNN predictor')
    parser.add_argument('--lr', type=float, default=0.005, help='Learning rate')
    parser.add_argument('--num_epochs', type=int, default=10, help='Number of epochs to train the model')
    parser.add_argument('--video_url', type=str, default="https://www.youtube.com/watch?v=example_video_id", help='URL of the video to process')
    parser.add_argument('--video_name', type=str, default="your_youtube_video_name", help='Name of the video file (with extension) to process')
    parser.add_argument('--threshold', type=float, default=0.6, help='Detection threshold for process_video')
    parser.add_argument('--display_video', type=bool, default=True, help='Whether to display the video during processing')
    parser.add_argument('--data_path', type=str, default='results/data', help='Path to save downloaded data')
    parser.add_argument('--model_path', type=str, default='results/models', help='Path to save model weights')
    parser.add_argument('--mode', type=str, choices=['train', 'process_video', 'hf_upload'], default='train', help='Mode of operation: train or process_video')
    parser.add_argument('--delete_folder_and_video', type=bool, default=False, help='Whether to delete the image folder and download video after use')
    parser.add_argument('--check_intersections', type=bool, default=False , help='Enable intersection checks in video processing')
    parser.add_argument('--classes_to_track', nargs='+', help='Classes to track for intersections, specified as pairs', default=[])

    args = parser.parse_args()
    main(args)


Writing train.py


In [None]:
#example usage
!python train.py --api_key <api_key> \
                --workspace basketball-formations \
                --project_name basketball-and-hoop-7xk0h \
                --project_folder_name basketball-and-hoop \
                --version 11 \
                --hidden_layer 256 \
                --lr 0.005 \
                --num_epochs 1 \
                --threshold 0.6 \
                --video_url "https://www.youtube.com/watch?v=kh7s2tGvswc&t=1s" \
                --video_name "Devin Booker Sets Record, Wins Three-Point Contest" \
                --delete_folder_and_video True \
                --mode train 

                #--check_intersections True \
                #--classes_to_track Basketball Hoop \



In [14]:
#Car Model Ex:
#classes

!python train.py --api_key <api_key> \
                --workspace netventure \
                --project_name car_segment-yarm8 \
                --project_folder_name car_segment \
                --version 1 \
                --hidden_layer 256 \
                --lr 0.005 \
                --num_epochs 10 \
                --threshold 0.8 \
                --video_url "https://www.youtube.com/watch?v=boVidZ2K-QI" \
                --video_name "Driving for 1 minute" \
                --delete_folder_and_video True \
                --mode train 


                #--check_intersections True \
                #--classes_to_track Car Plane \



The system cannot find the file specified.


In [15]:
%%writefile app.py
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
import torch
import utils
from torchvision import transforms
from PIL import Image
import os
from pathlib import Path

# Import modular functions (make sure all functions are correctly imported)
from going_modular.utils import (get_device, create_directory, get_project,
                                 download_files, construct_dataset_paths,
                                 download_videos_from_youtube)
from going_modular.coco_dataset import CustomCocoDataset
from going_modular.model_utils import get_model_instance_segmentation, load_classes_from_json
from going_modular.engine import train_model
from going_modular.process_video_check import process_video_check
from going_modular.transforms import get_transform, transform_image

app = FastAPI()

# Global variables (Consider storing and accessing these more securely and flexibly)
MODEL_PATH = Path('results/models/model_weights.pth')
 

# Ensure MODEL_PATH directory exists
MODEL_PATH.parent.mkdir(parents=True, exist_ok=True)

def load_model(num_classes: int, model_path: Path = MODEL_PATH):
    model = get_model_instance_segmentation(num_classes, hidden_layer=256)
    model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
    model.eval()
    return model

@app.get("/")
async def root():
    return {"message": "Welcome to the API!"}

@app.post("/train")
async def train(api_key: str, workspace: str, project_name: str, project_folder_name: str, version: int, num_epochs: int = 10):
    # Set up device
    device = get_device()
    
    # Ensure data and model directories exist
    data_path = Path('results/data')
    model_path = Path('results/models')
    data_path.mkdir(parents=True, exist_ok=True)
    model_path.mkdir(parents=True, exist_ok=True)
    
    # Download and prepare the dataset
    dataset = get_project(api_key, workspace, project_name, version)
    train_annotation_path, valid_annotation_path, test_annotation_path, train_image_dir, valid_image_dir, test_image_dir = construct_dataset_paths(project_folder_name, version)
    
    # Load class names and set up the model
    CLASSES_JSON = Path(f'{project_folder_name}-{version}/test/_annotations.coco.json')
    classes = load_classes_from_json(CLASSES_JSON)
    num_classes = len(classes) + 1
    
    model = get_model_instance_segmentation(num_classes, hidden_layer=256)
    model.to(device)
    
    # Prepare datasets
    train_dataset = CustomCocoDataset(train_annotation_path, train_image_dir, transforms=get_transform(train=True))
    valid_dataset = CustomCocoDataset(valid_annotation_path, valid_image_dir, transforms=get_transform(train=False))
    
    # Set up data loaders
    train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=0, collate_fn=utils.collate_fn)
    valid_data_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0, collate_fn=utils.collate_fn)
    
    # Train the model
    train_model(model, train_data_loader, valid_data_loader, device, num_epochs)
    
    # Save the trained model
    torch.save(model.state_dict(), MODEL_PATH)
    
    return {"message": "Model trained and saved successfully"}


@app.post("/predict")
async def predict(project_folder_name: str, version: int, file: UploadFile = File(...)):
    CLASSES_JSON = Path(f'{project_folder_name}-{version}/test/_annotations.coco.json')
    classes = load_classes_from_json(CLASSES_JSON)
    num_classes = len(classes) + 1
    model = load_model(num_classes)
    
    image_bytes = await file.read()
    tensor = transform_image(image_bytes)
    
    # Prediction logic
    with torch.no_grad():
        outputs = model(tensor)
        probabilities = torch.nn.functional.softmax(outputs, dim=1)
        top_prob, top_catid = probabilities.topk(1, dim=1)
    
    predicted_class = classes[top_catid.item()]
    confidence = top_prob.item()
    
    return JSONResponse(content={"class": predicted_class, "confidence": confidence})

from fastapi import UploadFile, File
from pathlib import Path
import shutil

@app.post("/process_video")
async def process_video(project_folder_name: str, version: int, video_file: UploadFile = File(...)):
    # Define where to save the video temporarily
    temp_video_path = Path("temp_videos") / video_file.filename
    temp_video_path.parent.mkdir(parents=True, exist_ok=True)  # Ensure directory exists

    # Save the uploaded video to the temporary path
    with temp_video_path.open("wb") as buffer:
        shutil.copyfileobj(video_file.file, buffer)
    
    # Load model and classes for processing
    CLASSES_JSON = Path(f'{project_folder_name}-{version}/test/_annotations.coco.json')
    classes = load_classes_from_json(CLASSES_JSON)
    num_classes = len(classes) + 1
    model = load_model(num_classes)
    
    # Assuming process_video_check is adapted to return a meaningful result
    # For example, modifying process_video_check to accept a video path and return a dictionary of results
    results = process_video_check(str(temp_video_path), model, get_device(), classes, [('Basketball', 'Hoop')], threshold=0.6)

    # Optionally, delete the temporary video file after processing
    temp_video_path.unlink(missing_ok=True)
    
    return results

if __name__ == "__main__":
    uvicorn.run(app, host="localhost", port=8000)

#!uivcorn app:app --reload

#For looking at all the options**
#http://127.0.0.1:8000/docs

Writing app.py
