In [None]:
!pip3 install opencv-python  > /dev/null
!pip3 install albumentations > /dev/null

# Master

In [2]:
# Imports
import numpy as np
from PIL import Image
from torchvision import transforms as T
import cv2 as cv
import torch
import torch.utils.data
import os
import albumentations as A
import time
import sys 


# Set Variable
os.environ['KMP_DUPLICATE_LIB_OK']='True'
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Variables
BOX_COLOR = (0, 0, 225) # Red
BOX_THICKESS = 5
TEXT_COLOR = (100, 255, 0)
TEXT_FONT = 0
IMG_SIZE = (200, 200)
category_id_to_name = { 1: "dent", 2: "broken_glass", 3: "deflated_wheel", 4: "scratch", 5: "broken_headlight"}
transform_to_tensor = T.Compose([T.ToTensor()])

# UNUSED
def get_training_augmentation(_src, _bboxes, _category_id, _size):
    transform = A.Compose([A.Resize(_size[0], _size[1])], bbox_params=A.BboxParams(format='coco', label_fields=['labels']))
    return transform(image=_src, bboxes=_bboxes, category_id=_category_id)['bboxes']


def tensor_to_image(tensor):
    tensor = tensor*255
    tensor = np.array(tensor, dtype=np.uint8)
    if np.ndim(tensor)>3:
        assert tensor.shape[0] == 1
        tensor = tensor[0]
    return Image.fromarray(tensor)


def draw_box(_image, _left, _top, _right, _bottom, _name):
    TEXT_SIZE = round(get_optimal_font_scale(_name, _image.shape[1])/2)
    cv.rectangle(_image, (_left, _top), (_right, _bottom), BOX_COLOR, BOX_THICKESS)
    cv.putText(_image, _name, (_right+10, _bottom), TEXT_FONT, TEXT_SIZE, TEXT_COLOR)
    return _image


def get_optimal_font_scale(text, width):
    for scale in range(60, 0, -1):
        textSize = cv.getTextSize(text, fontFace=cv.FONT_HERSHEY_DUPLEX, fontScale=scale/10, thickness=1)
        if (textSize[0][0] <= width):
            return scale/10
    return 1


def get_image_with_boxes(img=None, prediction=None, resize=False, fx=0, fy=0, tolerance=0):
    h = 0
    array_bbox = []
    for left, top, right, bottom in prediction[0]['boxes']:
        if float(prediction[0]["scores"][h]) > tolerance:
            left, top, right, bottom = int(left), int(top), int(right), int(bottom)
            name = category_id_to_name.get(int(prediction[0]['labels'][h]))
            if resize:
                assert(fx != 0)
                assert(fy != 0)
                left = int(round(left * fx))
                top  = int(round(top * fy))
                right = round(right * fx)
                bottom = int(round(bottom * fy))
            array_bbox.append( (left, top, right, bottom, name) )                  # Save boxes
            img = draw_box(img, left, top, right, bottom, name)
        h += 1
    return img, array_bbox


def show_webcam(MIRROR=False, VIDEO_PATH=0, SHOW_FPS=False, STREAMING=False, RESIZE=False, CONSOLE=False):
    model = torch.load(PATH, map_location=device)
    model.eval()
    cam = cv.VideoCapture(VIDEO_PATH)
    frame_exist = cam.isOpened()
    assert(frame_exist)
    frame_id = 0
    last_bboxes = []
    fps_history = []
    FX = 1
    FY = 1

    prev_frame_time = 0
    new_frame_time = 0

    while frame_exist:
        frame_exist, img = cam.read()
        new_frame_time = time.time()

        if not frame_exist: break
        if MIRROR: img = cv.flip(img, 1)

        if RESIZE:
            FX = 500 / img.shape[1]
            FY = 500 / img.shape[0]
            debug_img = cv.resize(img, (0,0), fx=FX, fy=FY)
        else:
            debug_img = img

        '''
        Optimization trick:
        Nearest frames in camera or video streaming have the same objects which are approximately placed closely
        We can skip some next frames from predictor and move the boxes from previous frame to increase FPS in STREAMING mode
        Skipped frames output without latency
        '''
        FLAG = False
        if frame_id % FRAME_PRED == 0:
            FLAG = True
            img_T = transform_to_tensor(debug_img)
            with torch.no_grad(): prediction = model([img_T.to(device)])

            if len(prediction[0]["labels"]) > 0:  # Predictor find any class of damage?
                last_bboxes.clear()
                img, last_bboxes = get_image_with_boxes(img=img, prediction=prediction, resize=RESIZE, fx=1/FX, fy=1/FY, tolerance=TOLERANCE)
            else:
                last_bboxes.clear()

        elif STREAMING and len(last_bboxes):
            for left, top, right, bottom, name in last_bboxes:
                img = draw_box(img, left, top, right, bottom, name)
        
        if (FLAG or STREAMING) and SHOW_FPS:
            fps = 1/(new_frame_time-prev_frame_time)
            TEXT_SIZE = round(get_optimal_font_scale(f"FPS: 000000", img.shape[1])/2)
            prev_frame_time = new_frame_time
            fps_history.append(int(fps))
            if not CONSOLE:
                cv.putText(img, f"FPS: { sum(fps_history)/len(fps_history):0.2f}", (7, 70), TEXT_FONT, TEXT_SIZE, TEXT_COLOR, 3, cv.LINE_AA)
        

        if not CONSOLE and (FLAG or STREAMING):
            cv.imshow('my webcam', img)

        frame_id += 1
        if cv.waitKey(1) == 27: 
            break  # esc to quit
    if CONSOLE and len(fps_history):
        print(f"Average FPS: { sum(fps_history)/len(fps_history):0.2f}")
    cv.destroyAllWindows()
    return 


FRAME_PRED = 10                     # FPS of Prediction pipeline
TOLERANCE = 0.4                     # Predictors confidence of class
PATH = "include/model_70.pt"        # Path to predictor model

list = ["data/IMG_5367.MOV",
        "data/testcam.png",
        "data/car.jpeg",
        "data/carb.jpg",
        "data/car_2.jpg",
        "data/car3.jpg",
        0                     # Camera
]
path = list[0]

#show_webcam(VIDEO_PATH=path)           # Main function
show_webcam(VIDEO_PATH=path, MIRROR=False, SHOW_FPS=True, STREAMING=True, RESIZE=True, CONSOLE=False)


KeyboardInterrupt: 

# Perf tests

In [28]:
def perf_test(resize=False):
    import time
    list_time = []
    array = ["data/car3.jpg", "data/car_2.jpg", "data/carb.jpg", "data/car.jpeg", "data/testcam.png"]
    iters = 50

    for i in range(iters):
        path = array[i%len(array)]
        tic = time.perf_counter()
        show_webcam(VIDEO_PATH=path, RESIZE=resize)
        toc = time.perf_counter()
        list_time.append((tic, toc))

    average_time = 0
    for pair in list_time:
        average_time += pair[1] - pair[0]

    print(f"Perf demo test: {average_time/iters:0.4f} average seconds")


def perf_model_test(_path, crop=False):
    import time
    list_time = []
    model = torch.load(_path, map_location=device)
    model.eval()
    array = ["data/car3.jpg", "data/car_2.jpg", "data/carb.jpg", "data/car.jpeg", "data/testcam.png"]
    iters = 5

    for i in range(iters):
        cam = cv.VideoCapture(array[i%len(array)])
        _, img = cam.read()
        if crop:
            FX = 500 / img.shape[1]
            FY = 500 / img.shape[0]
            img = cv.resize(img, (0,0), fx=FX, fy=FY)
        img_T = transform_to_tensor(img)
        with torch.no_grad():
            tic = time.perf_counter()
            prediction = model([img_T])
            toc = time.perf_counter()
            list_time.append((tic, toc))

    average_time = 0
    for pair in list_time:
        average_time += pair[1] - pair[0]
    print(f"Perf model test: {average_time/iters:0.4f} average seconds")

In [29]:
perf_test(resize=False)
perf_test(resize=True)

Perf demo test: 0.8717 average seconds
Perf demo test: 0.6596 average seconds


In [32]:
perf_model_test("include/model_70.pt", crop=False)
perf_model_test("include/model_70.pt", crop=True)
'''perf_model_test("include/model_3.pt", crop=False)

perf_model_test("include/model_70.pt", crop=True)
perf_model_test("include/model_3.pt", crop=True)
#perf_model_test("include/model_70.pt", crop=True)
#perf_model_test("include/model_2.pt")
'''

Perf model test: 1.0469 average seconds
Perf model test: 0.9151 average seconds


'perf_model_test("include/model_3.pt", crop=False)\n\nperf_model_test("include/model_70.pt", crop=True)\nperf_model_test("include/model_3.pt", crop=True)\n#perf_model_test("include/model_70.pt", crop=True)\n#perf_model_test("include/model_2.pt")\n'