In [1]:
import sys
import os
import subprocess
import shlex
import threading
import numpy as np

import cv2

import torch
from torch import nn
import torchvision.transforms as T
torch.set_grad_enabled(False);

In [2]:
np.array([0.485, 0.456, 0.406]) * 255

array([123.675, 116.28 , 103.53 ])

In [None]:
# COCO classes
CLASSES = [
    'N/A', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A',
    'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
    'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack',
    'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis',
    'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
    'skateboard', 'surfboard', 'tennis racket', 'bottle', 'N/A', 'wine glass',
    'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich',
    'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake',
    'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
    'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
    'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A',
    'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
    'toothbrush'
]

# colors for visualization
COLORS = [
    [125,  46, 141],
    [118, 171,  47],
    [ 76, 189, 237]
]

In [None]:
# standard PyTorch mean-std input image normalization
transform = T.Compose([
    T.ToTensor(),
    T.Resize(800),
    T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# for output bounding box post-processing
def box_cxcywh_to_xyxy(x):
    x_c, y_c, w, h = x.unbind(1)
    b = [(x_c - 0.5 * w), (y_c - 0.5 * h),
         (x_c + 0.5 * w), (y_c + 0.5 * h)]
    return torch.stack(b, dim=1)

def rescale_bboxes(out_bbox, size):
    img_w, img_h = size
    b = box_cxcywh_to_xyxy(out_bbox)
    b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32)
    return b

# def plot_results(pil_img, prob, boxes):
#     plt.figure(figsize=(16,10))
#     plt.imshow(pil_img)
#     ax = plt.gca()
#     colors = COLORS * 100
#     for p, (xmin, ymin, xmax, ymax), c in zip(prob, boxes.tolist(), colors):
#         ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
#                                    fill=False, color=c, linewidth=3))
#         cl = p.argmax()
#         text = f'{CLASSES[cl]}: {p[cl]:0.2f}'
#         ax.text(xmin, ymin, text, fontsize=15,
#                 bbox=dict(facecolor='yellow', alpha=0.5))
#     plt.axis('off')
#     plt.show()

In [None]:
model = torch.hub.load('facebookresearch/detr', 'detr_resnet50', pretrained=True)
model.eval();

In [None]:
recGabriel = "ffmpeg -nostdin -probesize 32 -flags low_delay -fflags nobuffer -codec:v h264_cuvid -r 25 -i tcp://gabriel.local:5001 -pix_fmt rgb24 -an -vcodec rawvideo -f rawvideo pipe:"
recGabriel = shlex.split(recGabriel)
process = subprocess.Popen(recGabriel, stdout=subprocess.PIPE)

width = 1280
height = 1280


In [None]:
class CameraBufferCleanerThread(threading.Thread):
    def __init__(self, camera_process, name='camera-buffer-cleaner-thread'):
        self.camera = camera_process
        self.last_frame = None
        super(CameraBufferCleanerThread, self).__init__(name=name)
        self.start()

    def run(self):
        while True:
            raw_frame = self.camera.stdout.read(width*height*3)

            if len(raw_frame) != (width*height*3):
                print('Error reading frame!!!')

            else:
                # Transform the byte read into a numpy array, and reshape it to video frame dimensions
                frame = np.frombuffer(raw_frame, np.uint8)
                self.last_frame = frame.reshape((height, width, 3))

CameraCleaner = CameraBufferCleanerThread(process)

In [None]:

while True:

    # Flush the stdout to avoid buffering problems
    frame = CameraCleaner.last_frame

    # mean-std normalize the input image (batch-size: 1)
    batch = transform(frame).unsqueeze(0)

    # propagate through the model
    outputs = model(batch)

    # keep only predictions with 0.7+ confidence
    probas = outputs['pred_logits'].softmax(-1)[0, :, :-1]
    keep = probas.max(-1).values > 0.7

    # convert boxes from [0; 1] to image scales
    bboxes_scaled = rescale_bboxes(outputs['pred_boxes'][0, keep], frame.shape[:2])

    # Convert Image to OpenCV
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

    # Draw bounding boxes and labels of detections
    colors = COLORS * 100
    for p, (xmin, ymin, xmax, ymax), c in zip(probas[keep], bboxes_scaled.tolist(), colors):

        label = f'{CLASSES[p.argmax()]}: {p[p.argmax()]:0.2f}'

        cv2.rectangle(frame, (int(xmin), int(ymin)), (int(xmax), int(ymax)), c, 2)
        cv2.putText(frame, label, (int(xmin), int(ymin)), cv2.FONT_HERSHEY_SIMPLEX, 1, c, 2)

    # Show the frame
    cv2.imshow('frame', frame)

    # Press Q on keyboard to  exit
    if cv2.waitKey(1) & 0xFF == ord('q'):
        cv2.destroyAllWindows()
        break

In [None]:
frame = CameraCleaner.last_frame

transform(frame)[1][400][300:500]
