In [None]:
import os
import time
from functools import reduce
from ultralytics import YOLO
import numpy as np
import cv2
import pandas as pd
from torch import cuda

In [None]:
model_path = "../models/saved_instances/yolov8x-seg.pt"

In [None]:
# ALL_NAMES:
# YOLO(model_path).names

In [None]:
CLASSES_IDS = [
    2, # Car
    7, # Truck
    5, # Bus
]

In [None]:
device = "gpu" if cuda.is_available() else "cpu"

In [None]:
class YOLOSegmentation:
    def __init__(self, model_path, device):
        self.model = YOLO(model_path,)
        self.device = device

    def detect(self, img):
        height, width, channels = img.shape

        results = self.model.predict(
            source=img.copy(), save=False, save_txt=False, device=self.device,
            conf=0.3  # Default confidence is 0.25. We need more so that we will skip various trimmed machines that we don't expect to get
        )
        if results is None:
            return [], [], [], []

        result = results[0]
        segmentation_contours_idx = []
        for seg in result.masks.xyn:
            seg[:, 0] *= width
            seg[:, 1] *= height
            segment = np.array(seg, dtype=np.int32)
            segmentation_contours_idx.append(segment)
        bboxes = np.array(result.boxes.xyxy.to('cpu'), dtype="int")
        class_ids = np.array(result.boxes.cls.to('cpu'), dtype="int")
        scores = np.array(result.boxes.conf.to('cpu'), dtype="float").round(2)

        # bboxes = np.array(result.boxes.xyxy.cpu(), dtype="int")
        # class_ids = np.array(result.boxes.cls.cpu(), dtype="int")
        # scores = np.array(result.boxes.conf.cpu(), dtype="float").round(2)

        return bboxes, class_ids, segmentation_contours_idx, scores


In [None]:
def get_max_area(indexed_box1, indexed_box2):
    """
    Comparing function that will be passed to 'reduce'.
    We calculate area of two boxes and return biggest
    :param indexed_box1:
    :param indexed_box2:
    :return: one of boxes - tuple like `index, bbox`
    """
    b1_i, (b1_x, b1_y, b1_x2, b1_y2) = indexed_box1
    b2_i, (b2_x, b2_y, b2_x2, b2_y2) = indexed_box2
    b1_area = (b1_x2 - b1_x) * (b1_y2 - b1_y)
    b2_area = (b2_x2 - b2_x) * (b2_y2 - b2_y)

    # print(f"B1 AREA: {b1_area}")
    result = indexed_box1 if b1_area > b2_area else indexed_box2
    return result

In [None]:
def check_object(bboxes, classes):
    proper_boxes = []
    target_box = None, None
    for object_class, bbox in zip(classes, bboxes):
        # Calculate number of objects that may be cars
        if object_class in CLASSES_IDS:
            target_box = list(classes).index(object_class), bbox
            proper_boxes.append(bbox)

    # Now, if we have for than one car, we will continue checking
    if len(proper_boxes) > 1:
        # We will pick up the object with the biggest area occupied
        target_box = reduce(get_max_area, enumerate(bboxes))

    return target_box


In [None]:
def extract_contours(folder: str, output_dir: str, Segmentator: YOLOSegmentation, transform_to_drawings=False, skip_exist=False) -> None:
    logs = []
    for image_name in os.listdir(folder):
        if not (image_name.endswith(".jpg") or image_name.endswith(".jpeg")):
            continue
        start_path = folder + '/' + image_name
        end_path = f"{output_dir}/"
        os.makedirs(end_path, exist_ok=True)
        if skip_exist:
            if image_name in os.listdir(end_path):
                continue
        img = cv2.imread(start_path)
        if img is None:
            message = f"Can't read an image: {image_name}"
            print(Exception(message))
            logs.append(message)
            continue
        img = cv2.resize(img, None, fx=0.5, fy=0.5)

        bboxes, classes, segmentations, scores = Segmentator.detect(img)

        b_id, bbox = check_object(bboxes=bboxes, classes=classes)
        if bbox is None:
            message = f"Can't extract car at image: {image_name}"
            print(Exception(message))
            logs.append(message)
            continue
        points = np.array(segmentations[b_id])

        mask = np.zeros(img.shape[:2], dtype=np.uint8)

        cv2.drawContours(mask, [points], -1, (255, 255, 255), -1, cv2.LINE_AA)

        (x, y, x2, y2) = bbox


        img = cv2.bitwise_and(img, img, mask=mask)

        if transform_to_drawings:

            blurred_image = cv2.GaussianBlur(img.copy(),(5,5),0)

            img = cv2.Canny(blurred_image, 100, 160)


        img = img[y:y2, x:x2]
        cv2.imwrite(end_path + "/" + image_name, img)
        time.sleep(0.01)
    return logs


In [None]:
WHOLE_DATA_PATH = "../data/raw_in_one_folder"
RESULT_PATH = "../data/processed_dataset"

In [None]:
# for dataset in ['train', 'test']:
#     folder = WHOLE_DATA_PATH + "/" + dataset
#     result_folder = RESULT_PATH + "/" + dataset
#
#     result = extract_contours(folder=folder,
#                      output_dir=result_folder,
#                      Segmentator=YOLOSegmentation(model_path, device),
#                      transform_to_drawings=False,
#                      skip_exist=True)
#     print(result)

In [None]:
folder = "../data/val_dataset"
result_folder = "../data/val_dataset_segmented"

extract_contours(folder=folder,
                 output_dir=result_folder,
                 Segmentator=YOLOSegmentation(model_path, device),
                 transform_to_drawings=False)