### Base

In [19]:
import os
import random
import tqdm
import argparse

import numpy as np
from numpy import random
import torch
import cv2
from PIL import Image
import pandas as pd
import matplotlib.pyplot as plt

from ultralytics import YOLO # yolov8

import warnings
warnings.filterwarnings("ignore")

In [5]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [6]:
model = YOLO('model/yolov8n.pt')

Downloading https://github.com/ultralytics/assets/releases/download/v8.1.0/yolov8n.pt to 'model\yolov8n.pt'...


100%|██████████| 6.23M/6.23M [00:00<00:00, 19.7MB/s]


### Support Functions

In [7]:
data_deque = {}
deep_sort = None

In [8]:
def compute_color_for_labels(label):
    """
    Compute a color for a given label.
    :param label: The label for which to compute the color.
    :return: The computed color as a tuple of RGB values.
    """
    color = label % 16_777_216  # Limit the label to the range of 0 to 16,777,215
    r = (color // 65536) % 256
    g = (color // 256) % 256
    b = color % 256
    return r, g, b

In [9]:
def draw_border(img, pt1, pt2, color, thickness, r, d):
    '''
    Draw a fancy border around the given image
    :param img: The image to draw the border on.
    :param pt1: Top left point.
    :param pt2: Bottom right point.
    :param color: Color of the border.
    :param thickness: Thickness of the border.
    :param r: Border radius.
    :param d: Border line length.
    :return: The image with the drawn border.
    '''
    
    x1,y1 = pt1
    x2,y2 = pt2
    # Top left
    cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness)
    cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness)
    cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness)
    # Top right
    cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness)
    cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness)
    cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness)
    # Bottom left
    cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness)
    cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness)
    cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness)
    # Bottom right
    cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness)
    cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness)
    cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness)

    cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA)
    cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA)
    
    cv2.circle(img, (x1 +r, y1+r), 2, color, 12)
    cv2.circle(img, (x2 -r, y1+r), 2, color, 12)
    cv2.circle(img, (x1 +r, y2-r), 2, color, 12)
    cv2.circle(img, (x2 -r, y2-r), 2, color, 12)
    
    return img

In [10]:
def UI_box(x, img, color = None, label = None, line_thickness = None):
    """
    Draws a UI box on the image
    :param x: The coordinates of the box.
    :param img: The image to draw the box on.
    :param color: The color of the box.
    :param label: The label of the box.
    :param line_thickness: The thickness of the box.
    :return: The image with the drawn box.
    """
    tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1  # line/font thickness
    color = color or [random.randint(0, 255) for _ in range(3)]
    c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
    cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
    if label:
        tf = max(tl - 1, 1)  # font thickness
        t_size = cv2.getTextSize(label, 0, fontScale = tl / 3, thickness = tf)[0]

        img = draw_border(img, (c1[0], c1[1] - t_size[1] - 3), (c1[0] + t_size[0], c1[1] + 3), color, 1, 8, 2)

        cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness = tf, lineType = cv2.LINE_AA)

In [24]:
from deep_sort.utils.parser import get_config
from deep_sort.deep_sort import DeepSort
from collections import deque

In [26]:
def init_tracker(): # Apply DEEPSORT
    """
    Initialize the deep sort tracker
    config file is located at deep_sort/configs/deep_sort.yaml
    """
    global deep_sort
    cfg_deep = get_config()
    cfg_deep.merge_from_file("deep_sort/configs/deep_sort.yaml")

    deep_sort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
                            max_dist = cfg_deep.DEEPSORT.MAX_DIST, min_confidence = cfg_deep.DEEPSORT.MIN_CONFIDENCE,
                            nms_max_overlap = cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance = cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
                            max_age = cfg_deep.DEEPSORT.MAX_AGE, n_init = cfg_deep.DEEPSORT.N_INIT, nn_budget = cfg_deep.DEEPSORT.NN_BUDGET,
                            use_cuda = True)

In [27]:
def draw_boxes(img, bbox, names, object_id, identities=None, offset=(0, 0)):
    """
    Draws the bounding boxes on the image.
    :param img: The image to draw the bounding boxes on.
    :param bbox: The bounding boxes.
    :param names: The names of the objects.
    :param identities: The identities of the objects.
    :param offset: The offset of the bounding boxes.
    :return: The image with the drawn bounding boxes.
    """
    for i, box in enumerate(bbox):
        x1, y1, x2, y2 = [int(i) for i in box]
        x1 += offset[0]
        x2 += offset[0]
        y1 += offset[1]
        y2 += offset[1]
    
        # code to find center of bottom edge
        center = (int((x2+x1) / 2), int((y2+y2) / 2))

        # get ID of object
        id = int(identities[i]) if identities is not None else 0

        # create new buffer for new object
        if id not in data_deque:  
          data_deque[id] = deque(maxlen = 64)
        if i < len(object_id):
            color = compute_color_for_labels(object_id[i])
            obj_name = names[object_id[i]]
            label = '{}{:d}'.format("", id) + ":"+ '%s' % (obj_name)
        else:
            continue
        
        UI_box(box, img, label = label, color = color, line_thickness = 2)
        
        # draw center of bottom edge
        cv2.circle(img, center, 2, color, 12)
        data_deque[id].append(center)
        for j in range(1, len(data_deque[id])):
            if data_deque[id][j - 1] is None or data_deque[id][j] is None:
                continue
            thickness = int(np.sqrt(64 / float(j + 1)) * 2)
            cv2.line(img, (data_deque[id][j - 1]), (data_deque[id][j]), color, thickness)
    return img

In [28]:
def train(data, epochs, time, patience, batch, imgsz, save = True, device = None, worker = -1, project = None, name = None, pretrained = True, verbose = False, seed = 0, optimizer = 'auto'):
    """
    Trains the model with the given data.
    :param model: The model to train.
    :param data: The data to train the model with.
    :param epochs: The number of epochs to train the model.
    :param time: The time to train the model.
    :param patience: The patience for the early stopping.
    :param batch: The batch size for the training.
    :param imgsz: The size of the images.
    :param save: Whether to save the model.
    :param device: The device to train the model on.
    :param worker: The number of workers for the training.
    :param project: The project name for the training.
    :param name: The name of the model for the training.
    :param pretrained: Whether to use a pretrained model.
    :param verbose: Whether to print the training information.
    :param seed: The seed for the training.
    :param optimizer: The optimizer for the training.
    :return: The trained model.
    """
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    if worker == -1:
        worker = min([os.cpu_count(), 8])
    if seed is not None:
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
    if pretrained:
        model.half()
    model.to(device).train()
    dataset = data
    dataloader = torch.utils.data.DataLoader(dataset, batch_size = batch, num_workers = worker, shuffle = True, pin_memory = True, collate_fn = dataset.collate_fn)
    optimizer = model.configure_optimizers(optimizer)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones = [round(0.5 * epochs), round(0.75 * epochs)], gamma = 0.1)
    start_time = time.time()
    best_fitness = 0.0

    for epoch in range(epochs):
        model.train()
        model.fitness = 0
        model.loss = 0
        model.metrics = []
        pbar = tqdm(dataloader, desc = f'Epoch {epoch + 1}/{epochs}', unit = 'batch')
        for i, (imgs, targets, paths, _) in enumerate(pbar):
            imgs = imgs.to(device, non_blocking=True).half() if torch.cuda.is_available() else imgs.to(device, non_blocking=True)
            targets = targets.to(device)
            model.zero_grad()
            loss, outputs = model(imgs, targets)
            loss.backward()
            optimizer.step()
            model.loss += loss.item()
            model.fitness += outputs[0].mean().item()
            model.metrics.append(outputs)
            pbar.set_postfix(loss=model.loss / (i + 1), fitness=model.fitness / (i + 1))
        model.loss /= len(dataloader)
        model.fitness /= len(dataloader)
        scheduler.step()
        if model.fitness > best_fitness:
            best_fitness = model.fitness
            if save:
                model.save(name, epoch, project)
        if verbose:
            print(f'Epoch {epoch + 1}/{epochs}, Fitness: {model.fitness:.6f}, Loss: {model.loss:.6f}')
        if time.time() - start_time > time:
            break
    return model

In [29]:
def val(data, device = None, worker = -1, verbose = False, seed = 0):
    """
    Validates the model with the given data.
    :param model: The model to validate.
    :param data: The data to validate the model with.
    :param device: The device to validate the model on.
    :param worker: The number of workers for the validation.
    :param verbose: Whether to print the validation information.
    :param seed: The seed for the validation.
    :return: The validation results.
    """
    if device is None:
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    if worker == -1:
        worker = min([os.cpu_count(), 8])
    if seed is not None:
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
    model.to(device).eval()
    dataset = data
    dataloader = torch.utils.data.DataLoader(dataset, batch_size = 1, num_workers = worker, shuffle = False, pin_memory = True, collate_fn = dataset.collate_fn)
    model.fitness = 0
    model.loss = 0
    model.metrics = []
    pbar = tqdm(dataloader, desc = 'Validation', unit = 'batch')
    for i, (imgs, targets, paths, _) in enumerate(pbar):
        imgs = imgs.to(device, non_blocking = True).half() if torch.cuda.is_available() else imgs.to(device, non_blocking = True)
        targets = targets.to(device)
        with torch.no_grad():
            loss, outputs = model(imgs, targets)
        model.loss += loss.item()
        model.fitness += outputs[0].mean().item()
        model.metrics.append(outputs)
        pbar.set_postfix(loss = model.loss / (i + 1), fitness=model.fitness / (i + 1))
    model.loss /= len(dataloader)
    model.fitness /= len(dataloader)
    if verbose:
        print(f'Fitness: {model.fitness:.6f}, Loss: {model.loss:.6f}')
    return model

In [30]:
def predict(img_test = None, vid_test = None):
    """
    Predicts the objects in the given image or video.
    :param model: The model to predict the objects.
    :param img_test: The image to predict the objects.
    :param vid_test: The video to predict the objects.
    :return: The image or video with the predicted objects.
    """
    if img_test is not None:
        img = Image.open(img_test)
        img = model(img)
        img.show()
    if vid_test is not None:
        vid = cv2.VideoCapture(vid_test)
        frame_width = int(vid.get(3))
        frame_height = int(vid.get(4))
        size = (frame_width, frame_height)
        result = cv2.VideoWriter('output.mp4', cv2.VideoWriter_fourcc(*'MP4V'), 10, size)
        while True:
            ret, frame = vid.read()
            if not ret:
                break
            img = Image.fromarray(frame)
            img = model(img)
            result.write(np.array(img))
        vid.release()
        result.release()
    return model

In [None]:
if __name__ == "__main__":
    train(data, epochs, time, patience, batch, imgsz, save=True, device=None, worker=-1, project=None, name=None, pretrained=True, verbose=False, seed=0, optimizer='auto')
    val(data, device = None, worker = -1, verbose = False, seed = 0)
    predict(img_test = None, vid_test = None)

In [None]:
# For Structure run
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='YOLOv8 Training and Inference')

    # Add arguments for training
    parser.add_argument('--train', action = 'store_true', help = 'Train the model')
    parser.add_argument('--data', type = str, default = 'path/to/data.yaml', help = 'Path to the data YAML file')
    parser.add_argument('--epochs', type = int, default = 100, help = 'Number of epochs for training')
    parser.add_argument('--time', type = float, default = 3600, help = 'Time limit for training (in seconds)')
    parser.add_argument('--patience', type = int, default = 100, help = 'Patience for early stopping')
    parser.add_argument('--batch', type = int, default = 16, help = 'Batch size for training')
    parser.add_argument('--imgsz', type = int, default = 640, help = 'Image size for training')
    parser.add_argument('--save', action = 'store_true', help = 'Save the trained model')
    parser.add_argument('--device', type = str, default = None, help = 'Device for training (cpu or cuda)')
    parser.add_argument('--worker', type = int, default = -1, help = 'Number of workers for data loading')
    parser.add_argument('--project', type = str, default = 'runs/train', help = 'Project directory for saving the model')
    parser.add_argument('--name', type = str, default = 'exp', help = 'Name of the experiment')
    parser.add_argument('--pretrained', action = 'store_true', help = 'Use a pretrained model')
    parser.add_argument('--verbose', action = 'store_true', help = 'Print training information')
    parser.add_argument('--seed', type = int, default = None, help = 'Random seed for training')
    parser.add_argument('--optimizer', type = str, default = 'auto', help = 'Optimizer for training')

    # Add arguments for validation
    parser.add_argument('--val', action = 'store_true', help = 'Validate the model')
    parser.add_argument('--val_data', type = str, default = 'path/to/val_data.yaml', help = 'Path to the validation data YAML file')

    # Add arguments for inference
    parser.add_argument('--predict', action = 'store_true', help = 'Perform inference on an image or video')
    parser.add_argument('--img', type = str, default = None, help = 'Path to the input image')
    parser.add_argument('--vid', type = str, default = None, help = 'Path to the input video')

    args = parser.parse_args()

    # Train the model
    if args.train:
        train(model, args.data, args.epochs, args.time, args.patience, args.batch, args.imgsz, args.save, args.device, args.worker, args.project, args.name, args.pretrained, args.verbose, args.seed, args.optimizer)

    # Validate the model
    elif args.val:
        val(model, args.val_data)

    # Perform inference
    elif args.predict:
        if args.img:
            predict(model, img=args.img)
        elif args.vid:
            predict(model, vid=args.vid)
        else:
            print("Please provide either an image or a video for inference.")