In [None]:
# ===== Импорты и инициализация =====

#!pip install scikit-learn

from grid.model.navigation.objectinspection import ObjectInspect
from grid.model.navigation.visualservoing import VisualServoing
from grid.model.perception.detection.rt_detr import RT_DETR
from grid.model.perception.depth.metric3d import Metric3D

import airgen
import numpy as np
import rerun as rr
import cv2
import time
from math import cos, sin, pi
from sklearn.cluster import DBSCAN

# Инициализация моделей
nav_inspect = ObjectInspect()
nav_visualservoing = VisualServoing()
detection_rt_detr = RT_DETR(use_local=True)  # Инициализация модели детекции

In [None]:
# ===== Подключение к дрону и взлёт =====

client = airgen.MultirotorClient()
client.confirmConnection()
client.enableApiControl(True)
client.armDisarm(True)
# ===== Взлет дрона =====

#client.takeoffAsync().join()

# ===== Посадка дрона =====

#client.landAsync().join()

Connected!
Client Ver:1 (Min Req: 1), Server Ver:1 (Min Req: 1)



True

In [None]:
# ===== Получение и логирование GPS =====

gps_data = client.getGpsData()
geo = gps_data.gnss.geo_point
gps_position = np.array([[geo.latitude, geo.longitude, geo.altitude]], dtype=np.float32)
rr.log("drone/gps", rr.Points3D(gps_position))

In [None]:
# ===== Получение и подготовка данных LiDAR + Построение маршрута =====

lidar_data = client.getLidarData()
points = np.array(lidar_data.point_cloud, dtype=np.float32).reshape(-1, 3)
rr.log('lidar/points', rr.Points3D(points))

# ===== Фильтрация: исключаем "землю" (высота < 0.2 м) =====
above_ground = points[points[:, 2] > 0.2]

object_centers = []
object_distances = []

if above_ground.shape[0] > 0:
    # ===== Кластеризация (DBSCAN) =====
    clustering = DBSCAN(eps=1.0, min_samples=10).fit(above_ground[:, :2])
    labels = clustering.labels_

    # ===== Обработка кластеров =====
    unique_labels = set(labels)
    for label in unique_labels:
        if label == -1:
            continue  # -1 означает шум

        cluster_points = above_ground[labels == label]
        center = cluster_points.mean(axis=0)
        distance = np.linalg.norm(center[:2])
        object_centers.append(center)
        object_distances.append(distance)

    rr.log("lidar/objects", rr.Points3D(np.array(object_centers)))
    print(f"🔎 Обнаружено объектов по LiDAR: {len(object_centers)}")
    for i, dist in enumerate(object_distances):
        print(f" - Объект {i+1}: ~{dist:.1f} м")
else:
    print("⚠️ Нет данных выше уровня земли для кластеризации")

# ===== Построение кругового маршрута =====
start = client.getMultirotorState().kinematics_estimated.position
takeoff_alt = -15  # вверх в NED
center = start
radius = 10
steps = 12

trajectory = []
for i in range(steps):
    angle = 2 * pi * (i / steps)
    x = center.x_val + radius * cos(angle)
    y = center.y_val + radius * sin(angle)
    z = takeoff_alt
    trajectory.append(airgen.Vector3r(x, y, z))

trajectory.append(airgen.Vector3r(center.x_val, center.y_val, takeoff_alt))  # возврат

rr.log("telemetry/path", rr.LineStrips3D([[(p.x_val, p.y_val, p.z_val) for p in trajectory]]))


⚠️ Нет данных выше уровня земли для кластеризации


In [None]:
# ===== Плавное движение по маршруту с ML-обходом препятствий =====

def is_obstacle_near(current_pos, obstacles, threshold=2.0):
    for obstacle in obstacles:
        dist = np.linalg.norm(np.array([current_pos.x_val, current_pos.y_val]) - obstacle[:2])
        if dist < threshold:
            return True, dist
    return False, None

def find_safe_direction(current_pos, obstacles, radius=5.0, angle_step=15):
    best_point = None
    min_density = float('inf')

    for angle in range(0, 360, angle_step):
        rad = np.deg2rad(angle)
        test_x = current_pos.x_val + radius * cos(rad)
        test_y = current_pos.y_val + radius * sin(rad)

        density = 0
        for obs in obstacles:
            dist = np.linalg.norm(np.array([test_x, test_y]) - obs[:2])
            if dist < 2.0:
                density += 1

        if density < min_density:
            min_density = density
            best_point = airgen.Vector3r(test_x, test_y, current_pos.z_val)

    return best_point

for target_point in trajectory:
    while True:
        # Получаем текущую позицию и LiDAR
        current_pos = client.getMultirotorState().kinematics_estimated.position
        lidar_data = client.getLidarData()
        points = np.array(lidar_data.point_cloud, dtype=np.float32).reshape(-1, 3)
        rr.log('lidar/points', rr.Points3D(points))

        above_ground = points[points[:, 2] > 0.2]

        if above_ground.shape[0] == 0:
            print("⚠️ Нет точек выше уровня земли — пропускаю кластеризацию")
            obstacle_centers = np.empty((0, 3))
        else:
            clustering = DBSCAN(eps=1.0, min_samples=10).fit(above_ground[:, :2])
            labels = clustering.labels_

            obstacle_centers = []
            for label in set(labels):
                if label == -1:
                    continue
                cluster_points = above_ground[labels == label]
                center = cluster_points.mean(axis=0)
                obstacle_centers.append(center)

            obstacle_centers = np.array(obstacle_centers)

        rr.log("lidar/obstacles", rr.Points3D(obstacle_centers))

        # Проверка на наличие препятствий
        danger, dist = is_obstacle_near(current_pos, obstacle_centers)
        if danger:
            print(f"⚠️ Обнаружено препятствие на расстоянии {dist:.1f} м. Ищу обходной путь...")
            safe_point = find_safe_direction(current_pos, obstacle_centers)
            if safe_point:
                print("🔁 Перестроен маршрут. Лечу к безопасной точке...")
                client.moveToPositionAsync(safe_point.x_val, safe_point.y_val, safe_point.z_val, 2).join()
                print("✅ Достиг безопасной точки. Возобновляю маршрут.")
                continue
        break

    client.moveToPositionAsync(target_point.x_val, target_point.y_val, target_point.z_val, 3).join()

# ===== Завершающая посадка =====
landing_point = airgen.Vector3r(center.x_val, center.y_val, -2)
client.moveToPositionAsync(landing_point.x_val, landing_point.y_val, landing_point.z_val, 1).join()


⚠️ Нет точек выше уровня земли — пропускаю кластеризацию


⚠️ Обнаружено препятствие на расстоянии 1.0 м. Ищу обходной путь...
🔁 Перестроен маршрут. Лечу к безопасной точке...


✅ Достиг безопасной точки. Возобновляю маршрут.


AttributeError: 'numpy.ndarray' object has no attribute 'x_val'