1. Узел обнаружения объектов на изображениях:

In [None]:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from detection_msgs.msg import BoundingBox, ObjectDetection
import cv2
import numpy as np

class ObjectDetectionNode(Node):
    def __init__(self):
        super().__init__('object_detection')
        self.subscription = self.create_subscription(
            Image, 'camera/image_raw', self.image_callback, 10)
        self.publisher = self.create_publisher(
            ObjectDetection, 'object_detections', 10)
        self.bridge = CvBridge()
        self.model = self.load_pretrained_model()

    def load_pretrained_model(self):
        # Загрузка предварительно обученной модели для обнаружения объектов на изображениях
        model = cv2.dnn.readNetFromTensorflow('path/to/model.pb', 'path/to/model.pbtxt')
        return model

    def image_callback(self, msg):
        image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
        blob = cv2.dnn.blobFromImage(image, size=(300, 300), swapRB=True, crop=False)
        self.model.setInput(blob)
        detections = self.model.forward()

        detection_msg = ObjectDetection()
        for i in range(detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > 0.5:
                x = int(detections[0, 0, i, 3] * image.shape[1])
                y = int(detections[0, 0, i, 4] * image.shape[0])
                width = int(detections[0, 0, i, 5] * image.shape[1]) - x
                height = int(detections[0, 0, i, 6] * image.shape[0]) - y
                class_id = int(detections[0, 0, i, 1])

                box_msg = BoundingBox()
                box_msg.x = x
                box_msg.y = y
                box_msg.width = width
                box_msg.height = height
                box_msg.class_id = class_id
                box_msg.score = confidence
                detection_msg.bounding_boxes.append(box_msg)

        self.publisher.publish(detection_msg)

def main(args=None):
    rclpy.init(args=args)
    node = ObjectDetectionNode()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()


2. Узел обнаружения объектов на лидарных данных:

In [None]:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
from detection_msgs.msg import BoundingBox, ObjectDetection
import open3d as o3d

class LidarObjectDetectionNode(Node):
    def __init__(self):
        super().__init__('lidar_object_detection')
        self.subscription = self.create_subscription(
            PointCloud2, 'lidar/point_cloud', self.pointcloud_callback, 10)
        self.publisher = self.create_publisher(
            ObjectDetection, 'lidar_object_detections', 10)
        self.model = self.load_lidar_object_detection_model()

    def load_lidar_object_detection_model(self):
        # Загрузка предварительно обученной модели для обнаружения объектов в данных лидара
        model = o3d.ml.torch.models.PointNet2SSGClassification(device='cuda')
        model.load_state_dict(torch.load('path/to/model.pth'))
        return model

    def pointcloud_callback(self, msg):
        cloud = self.convert_pointcloud2_to_open3d(msg)
        detections = self.model.detect(cloud)

        detection_msg = ObjectDetection()
        for bbox, class_id, score in detections:
            box_msg = BoundingBox()
            box_msg.x = bbox[0]
            box_msg.y = bbox[1]
            box_msg.width = bbox[2]
            box_msg.height = bbox[3]
            box_msg.class_id = class_id
            box_msg.score = score
            detection_msg.bounding_boxes.append(box_msg)

        self.publisher.publish(detection_msg)

    def convert_pointcloud2_to_open3d(self, msg):
        # Преобразование ROS PointCloud2 в облако точек Open3D
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(np.asarray(msg.data).reshape(-1, 3))
        return pcd

def main(args=None):
    rclpy.init(args=args)
    node = LidarObjectDetectionNode()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()


3. Узел объединения датчиков:

In [None]:
import rclpy
from rclpy.node import Node
from detection_msgs.msg import BoundingBox, ObjectDetection, FusedObjectDetection

class SensorFusionNode(Node):
    def __init__(self):
        super().__init__('sensor_fusion')
        self.rgb_sub = self.create_subscription(
            ObjectDetection, 'object_detections', self.rgb_callback, 10)
        self.lidar_sub = self.create_subscription(
            ObjectDetection, 'lidar_object_detections', self.lidar_callback, 10)
        self.publisher = self.create_publisher(
            FusedObjectDetection, 'fused_object_detections', 10)
        self.rgb_detections = None
        self.lidar_detections = None

    def rgb_callback(self, rgb_detections):
        self.rgb_detections = rgb_detections

    def lidar_callback(self, lidar_detections):
        self.lidar_detections = lidar_detections
        fused_detections = self.fuse_detections(self.rgb_detections, self.lidar_detections)
        fused_msg = FusedObjectDetection()
        for bbox, class_id, score in fused_detections:
            box_msg = BoundingBox()
            box_msg.x = bbox[0]
            box_msg.y = bbox[1]
            box_msg.width = bbox[2]
            box_msg.height = bbox[3]
            box_msg.class_id = class_id
            box_msg.score = score
            fused_msg.bounding_boxes.append(box_msg)
        self.publisher.publish(fused_msg)

    def fuse_detections(self, rgb_detections, lidar_detections):
        # Реализация алгоритма объединения детекций
        # Например, использование пространственной близости и классификации
        fused_detections = []
        if rgb_detections and lidar_detections:
            for rgb_bbox, rgb_class, rgb_score in rgb_detections.bounding_boxes:
                for lidar_bbox, lidar_class, lidar_score in lidar_detections.bounding_boxes:
                    if self.is_bbox_overlap(rgb_bbox, lidar_bbox):
                        fused_bbox = self.get_fused_bbox(rgb_bbox, lidar_bbox)
                        fused_class = self.get_fused_class(rgb_class, lidar_class)
                        fused_score = (rgb_score + lidar_score) / 2
                        fused_detections.append((fused_bbox, fused_class, fused_score))
        return fused_detections

    def is_bbox_overlap(self, bbox1, bbox2):
        # Проверка пересечения ограничивающих боксов
        pass

    def get_fused_bbox(self, bbox1, bbox2):
        # Вычисление объединенного ограничивающего бокса
        pass

    def get_fused_class(self, class1, class2):
        # Объединение классификации
        pass

def main(args=None):
    rclpy.init(args=args)
    node = SensorFusionNode()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()


1. ObjectDetectionNode отвечает за обнаружение объектов на изображениях, используя предварительно обученную модель.
2. LidarObjectDetectionNode отвечает за обнаружение объектов в данных лидара, используя предварительно обученную модель.
3. SensorFusionNode отвечает за объединение результатов обнаружения объектов из камеры и лидара, используя алгоритм пространственной близости и объединения классификации.

Эти узлы работают вместе, образуя конвейер обработки, который максимально эффективен и быстр. Каждый узел подписывается на необходимые темы и публикует результаты в соответствующих темах, что обеспечивает четкую и модульную связь между ними.

Разделение задач на независимые узлы предотвращает использование плохих шаблонов проектирования, таких как монолитный узел, который пытается выполнять все задачи. Это улучшает поддерживаемость, масштабируемость и модульность системы.