In [2]:
import mmcv

mmcv.collect_env()

from mmcv.runner import load_checkpoint
from mmdet.apis import inference_detector
from mmrotate.models import build_detector

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from scipy.ndimage import rotate
import sys, os

sys.path.append("./protos")

import grpc
import messaging_pb2
import messaging_pb2_grpc

import os
import sys
import random
import cv2
import time

class ImageProcessingLayer:
    def __init__(
        self,
        mock=True,
        mock_image_path=None,
        mock_num_samples=10,
        mock_wait_time=1,
    ):
        self.mock = mock

        if not mock:
            return

        self.mock_wait_time = mock_wait_time

        if mock_image_path is None:
            mock_image_path = "data/demo.jpg" #jpg matters

        self._mock_img_full = np.asarray(Image.open(mock_image_path))
        self._output_dim = (200, 200)
        diag_len = np.sqrt(self._output_dim[0] ** 2 + self._output_dim[1] ** 2)
        self._gcps_pixels = self._generate_random_gcps(
            self._mock_img_full, mock_num_samples, padding=(diag_len, diag_len)
        )

        # maybe convert from pixels to lat/lon here

        self._path_pixels = self._build_path_pixels(self._gcps_pixels)

    def _generate_random_gcps(self, img, num_samples, padding=(0, 0)):
        return np.random.randint(
            padding,
            high=(img.shape[0] - padding[0], img.shape[1] - padding[1]),
            size=(num_samples, 2),
        )

    def _build_path_pixels(self, gcps):
        delta = np.diff(gcps, axis=0)
        directions = delta / np.linalg.norm(delta, axis=1).reshape(-1, 1)
        angles = np.arctan2(directions.T[1], directions.T[0]) * 180 / np.pi
        delta_angles = np.append(np.diff(angles), 0)

        path = []

        for t1, t2, angle, delta_angle in zip(gcps, gcps[1:], angles, delta_angles):
            steps = np.linalg.norm(t2 - t1) / 90
            line = np.linspace(t1, t2, steps.astype("uint32"), dtype="uint32")
            path.extend([np.array([x, y, angle]) for x, y in line])

            if delta_angle == 0:
                continue

            if len(line) == 0:
                continue

            interpolated_angles = np.linspace(angle, angle + delta_angle, 3)
            path.extend(
                [
                    np.array([line[-1][0], line[-1][1], theta])
                    for theta in interpolated_angles
                ]
            )

        return path

    def _next_image(self):
        if self.mock_wait_time > 0:
            time.sleep(self.mock_wait_time)

        sample_diag = np.sqrt(self._output_dim[0] ** 2 + self._output_dim[1] ** 2)

        for x, y, theta in self._path_pixels:
            sample = self._crop_around(
                self._mock_img_full, (y, x), (sample_diag, sample_diag)
            )
            rotated_img = self._center_crop(
                rotate(sample, -theta, reshape=False), self._output_dim
            )
            yield rotated_img

    def _crop_around(self, img, center, dim):
        dim = np.array(dim).astype("uint32")
        x = int(center[1] - dim[1] // 2)
        y = int(center[0] - dim[0] // 2)
        return img[y : y + dim[0], x : x + dim[1]]

    def _center_crop(self, img, dim):
        return img[
            img.shape[0] // 2 - dim[0] // 2 : img.shape[0] // 2 + dim[0] // 2,
            img.shape[1] // 2 - dim[1] // 2 : img.shape[1] // 2 + dim[1] // 2,
        ]

    def run(self, img=None):
        if not self.mock:
            assert img is not None, "Image cannot be None"
            return img

        return self._next_image()


class ObjectDetectionLayer:
    def __init__(
        self, config_file=None, checkpoint_file=None, device="cuda", min_confidence=0.3
    ):
        if config_file is None:
            config_file = "../examples/oriented_rcnn_r50_fpn_1x_dota_le90.py"
        if checkpoint_file is None:
            checkpoint_file = "../examples/oriented_rcnn_r50_fpn_1x_dota_le90-6d2b2ce0.pth"

        self.config_file = config_file
        self.checkpoint_file = checkpoint_file
        self.device = device

        self.model = self._load_model()
        self.min_confidence = min_confidence

    def _load_model(self):
        
        config = mmcv.Config.fromfile(self.config_file)
        config.model.pretrained = None

        model = build_detector(config.model)
        checkpoint = load_checkpoint(
            model, self.checkpoint_file, map_location=self.device
        )

        model.CLASSES = checkpoint["meta"]["CLASSES"]
        model.cfg = config
        model.to(self.device)
        model = model.eval()

        return model

    def _get_bboxes_pixels(self, img):
        vehicle_classes = [
            i for i, c in enumerate(self.model.CLASSES) if "vehicle" in c
        ]

        inference = inference_detector(self.model, img)
        bboxes = [inference[index] for index in vehicle_classes]

        bboxes = np.concatenate(bboxes, axis=0)
        bboxes = bboxes[bboxes[:, 5] > self.min_confidence]

        # the bboxes are in a weird polygonal format, so we convert them to rectangles
        rect_bboxes = (
            np.array(
                [
                    bboxes[:, 1] - bboxes[:, 2] // 2,
                    bboxes[:, 1] + bboxes[:, 2] // 2,
                    bboxes[:, 0] - bboxes[:, 2] // 2,
                    bboxes[:, 0] + bboxes[:, 3],
                    100 * bboxes[:, -1],  # confidence score
                ]
            )
            .astype(int)
            .T
        )

        # follows the format of x0, x1, y0, y1, confidence
        return rect_bboxes

    def run(self, img):
        result = self._get_bboxes_pixels(img)

        # convert pixels to lat/lon here
        return result


class MavlinkInterfaceLayer:
    def __init__(self, protos_path="protos"):
        self.protos_path = protos_path
        self.channel = grpc.insecure_channel("localhost:50051")
        self.stub = messaging_pb2_grpc.MessagingServiceStub(self.channel)
        pass

    def run(self, bboxes):
        if len(bboxes) == 0:
            return

        print(bboxes)



In [3]:
img_layer = ImageProcessingLayer(mock_wait_time=1)
obj_layer = ObjectDetectionLayer(device='cpu')
mav_layer = MavlinkInterfaceLayer()

lat_center = 29.643946
lon_center = -82.355659

lat_mile = 0.0144927536231884
lon_mile = 0.0181818181818182
lat_min = lat_center - (15 * lat_mile)
lat_max = lat_center + (15 * lat_mile)
lon_min = lon_center - (15 * lon_mile)
lon_max = lon_center + (15 * lon_mile)

for img in img_layer.run():
    bboxes = obj_layer.run(img)
    mav_layer.run(bboxes)

    for bbox in bboxes:
        bbox = bbox.astype(float)
        bbox[0] = float(float(bbox[0]) / 200 * ( lat_max - lat_min ) + lat_min)
        bbox[1] = float(float(bbox[1]) / 200 * ( lon_max - lon_min ) + lon_min)
        output = str(bbox[:4])[1:-1]
        print(output)
        response = mav_layer.stub.SendData(messaging_pb2.DataRequest(data=output))



load checkpoint from local path: ../examples/oriented_rcnn_r50_fpn_1x_dota_le90-6d2b2ce0.pth




[[109 121  56  68  76]
 [114 128  56  69  71]
 [102 116  55  67  71]
 [157 169  61  72  70]
 [ 95 109  54  67  67]
 [120 134  57  68  62]
 [150 164  59  71  60]
 [132 146  58  69  51]
 [126 140  57  69  47]
 [143 157  59  71  46]
 [138 152  58  71  41]]
 29 -82  56  68
 29 -82  56  69
 29 -82  55  67
 29 -82  61  72
 29 -82  54  67
 29 -82  57  68
 29 -82  59  71
 29 -82  58  69
 29 -82  57  69
 29 -82  59  71
 29 -82  58  71
[[109 121  56  68  76]
 [114 128  56  69  71]
 [102 116  55  67  71]
 [157 169  61  72  70]
 [ 95 109  54  67  67]
 [120 134  57  68  62]
 [150 164  59  71  60]
 [132 146  58  69  51]
 [126 140  57  69  47]
 [143 157  59  71  46]
 [138 152  58  71  41]]
 29 -82  56  68
 29 -82  56  69
 29 -82  55  67
 29 -82  61  72
 29 -82  54  67
 29 -82  57  68
 29 -82  59  71
 29 -82  58  69
 29 -82  57  69
 29 -82  59  71
 29 -82  58  71
[[55 69 84 96 53]
 [57 71 77 90 52]
 [58 72 72 84 32]
 [68 82 30 42 32]]
 29 -82  84  96
 29 -82  77  90
 29 -82  72  84
 29 -82  30  42
[[ 

KeyboardInterrupt: 