In [5]:
%pip install opencv-python-headless numpy torch shapely ultralytics redis

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


### Импорт необходимых библиотек:
- **cv2**: Библиотека OpenCV для работы с изображениями и видео.
- **numpy**: Библиотека для работы с массивами и числовыми вычислениями.
- **time**: Модуль для работы с временем.
- **torch**: Библиотека PyTorch для работы с нейронными сетями.
- **shapely.geometry**: Модуль для работы с геометрическими объектами, такими как полигоны и точки.

In [16]:
import cv2
import numpy as np
import time
import torch
from ultralytics import YOLO
from shapely.geometry import Polygon, Point
import redis
import json

In [17]:
r = redis.Redis(host='localhost', port=6379, db=0)

In [18]:
class ParkingSpaceMonitor:
    def __init__(self, video_source, yolo_model_path, parking_id, parking_spaces_coords, check_interval=10, 
                 consecutive_checks=3, confidence_threshold=0.5):
        """Инициализация системы мониторинга парковочных мест"""
        self.video_source = video_source
        self.yolo_model_path = yolo_model_path
        self.check_interval = check_interval
        self.consecutive_checks = consecutive_checks
        self.confidence_threshold = confidence_threshold
        
        # Инициализация модели YOLO
        self.model = YOLO(yolo_model_path)
        
        # Только один класс - Vehicle с ID 0
        self.vehicle_classes = [0]  # Только ID 0 для класса Vehicle
        self.class_names = {0: 'Vehicle'}
        
        # Инициализация парковочных мест
        self.parking_id = parking_id
        self.parking_spaces = {}
        for space_id, coords in parking_spaces_coords.items():
            self.parking_spaces[space_id] = {
                'polygon': Polygon(coords),
                'status': 'free',
                'history': []
            }
    
    def detect_objects(self, frame):
        """Обнаруживает объекты на кадре с помощью модели YOLO"""
        results = self.model(frame)
        result = results[0]  # Get the first result
        
        vehicles = []
        for box in result.boxes:
            cls_id = int(box.cls.item())
            
            # Проверяем, что обнаруженный объект - транспортное средство (class_id = 0)
            if cls_id in self.vehicle_classes:
                confidence = float(box.conf.item())
                
                # Только если уверенность выше порога
                if confidence >= self.confidence_threshold:
                    # Получаем координаты ограничивающего прямоугольника
                    x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                    center_x = (x1 + x2) // 2
                    center_y = (y1 + y2) // 2
                    
                    vehicles.append({
                        'center': Point(center_x, center_y),
                        'bbox': (x1, y1, x2, y2),
                        'class': self.class_names.get(cls_id, 'Vehicle'),
                        'confidence': confidence
                    })
        
        return vehicles

    def check_parking_spaces(self, vehicles, occupation_threshold=0.4):
        """Проверяет занятость парковочных мест"""
        current_status = {}
        
        for space_id, space_info in self.parking_spaces.items():
            space_polygon = space_info['polygon']
            space_area = space_polygon.area
            occupied_area = 0
            
            for vehicle in vehicles:
                # Создаем полигон из bbox автомобиля
                x1, y1, x2, y2 = vehicle['bbox']
                vehicle_polygon = Polygon([(x1, y1), (x2, y1), (x2, y2), (x1, y2)])
                
                # Если есть пересечение, добавляем площадь пересечения
                if space_polygon.intersects(vehicle_polygon):
                    intersection = space_polygon.intersection(vehicle_polygon)
                    occupied_area += intersection.area
            
            # Вычисляем процент занятости
            occupation_percentage = occupied_area / space_area if space_area > 0 else 0
            
            # Если процент занятости выше порога, считаем место занятым
            current_status[space_id] = 'occupied' if occupation_percentage >= occupation_threshold else 'free'
        
        return current_status
    
    def update_parking_status(self, current_status):
        """Обновляет статусы парковочных мест и возвращает список изменений"""
        status_changes = []

        for space_id, current in current_status.items():
            self.parking_spaces[space_id]['history'].append(current)

            if len(self.parking_spaces[space_id]['history']) > self.consecutive_checks:
                self.parking_spaces[space_id]['history'].pop(0)

            if len(self.parking_spaces[space_id]['history']) == self.consecutive_checks:
                if all(status == current for status in self.parking_spaces[space_id]['history']):
                    if current != self.parking_spaces[space_id]['status']:
                        status_changes.append((space_id, self.parking_spaces[space_id]['status'], current))
                        self.parking_spaces[space_id]['status'] = current

        return status_changes

    def visualize(self, frame, parking_spaces):
        """Отображает на кадре статус парковочных мест"""
        for space_id, space_info in parking_spaces.items():
            coords = np.array(space_info['polygon'].exterior.coords, np.int32)
            # Зеленый для свободного, красный для занятого
            color = (0, 255, 0) if space_info['status'] == 'free' else (0, 0, 255)
            cv2.polylines(frame, [coords], True, color, 2)
            cv2.putText(frame, f"ID: {space_id}", coords[0], cv2.FONT_HERSHEY_SIMPLEX, 
                        0.6, color, 2)
        
        return frame
    
    def run(self):
        """Main loop for monitoring parking spaces using video processing approach"""
        # Открываем видеофайл
        cap = cv2.VideoCapture(self.video_source)
        
        # Получаем свойства видео
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = total_frames / fps
        print(f"Длительность видео: {duration:.2f} секунд, FPS: {fps}")
        
        # Создаем видеозапись для вывода
        output_path = 'parking_monitor_output.mp4'
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        # Рассчитываем интервал кадров для анализа
        frame_interval = int(fps * self.check_interval)
        current_frame = 0
        last_vehicles = []
        
        # Обрабатываем все кадры
        while True:
            # Читаем кадр
            success, frame = cap.read()
            if not success or frame is None:
                break
                
            # Запускаем обнаружение и анализ через указанные интервалы
            if current_frame % frame_interval == 0:
                # print(f"Обработка кадра {current_frame} в момент {current_frame/fps:.2f} секунд")
                
                # Обнаруживаем объекты на кадре
                last_vehicles = self.detect_objects(frame)
                # print(f"Обнаружено транспортных средств: {len(last_vehicles)}")
                
                # Проверяем занятость парковочных мест
                current_status = self.check_parking_spaces(last_vehicles)
                
                # Обновляем статусы парковочных мест
                status_changes = self.update_parking_status(current_status)
                
                # Выводим изменения статуса в консоль и в редис
                for space_id, old_status, new_status in status_changes:
                    print(f"Парковочное место {space_id} изменило статус с '{old_status}' на '{new_status}'")
                    key = f"parking:{self.parking_id}:space:{space_id}"
                    r.hset(key, mapping={
                        "status": new_status,
                    })
                    channel = f"parking:{self.parking_id}:updates"
                    r.publish(channel, json.dumps({
                        "space_id": space_id,
                        "status": new_status,
                        "timestamp": int(time.time())
                    }))
            
            # Визуализируем результаты
            vis_frame = self.visualize(frame.copy(), self.parking_spaces)
            
            # Отображаем обнаруженные транспортные средства
            for vehicle in last_vehicles:
                x1, y1, x2, y2 = vehicle['bbox']
                cv2.rectangle(vis_frame, (x1, y1), (x2, y2), (255, 165, 0), 2)  # Оранжевый цвет
                label = f"{vehicle['class']}: {vehicle['confidence']:.2f}"
                cv2.putText(vis_frame, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 
                        0.5, (255, 165, 0), 2)
            
            # Записываем кадр в выходное видео
            out.write(vis_frame)
            
            current_frame += 1
            
            # Периодически выводим прогресс обработки
            # if current_frame % int(fps) == 0:
            #     print(f"Обработка: {current_frame}/{total_frames} кадров ({current_frame/total_frames*100:.1f}%)")
        
        # Освобождаем ресурсы
        cap.release()
        out.release()
        cv2.destroyAllWindows()
        print(f"Видео сохранено в {output_path}")

# Система мониторинга парковочных мест с использованием компьютерного зрения

## Общее описание

Представленный код реализует автоматизированную систему мониторинга парковочных мест с использованием компьютерного зрения и нейронных сетей. Система определяет статус каждого парковочного места (занято/свободно) путем анализа видеопотока с камеры наблюдения.

## Принцип работы

1. **Обнаружение транспортных средств**: Используется предобученная модель YOLO для обнаружения автомобилей на видео.
2. **Определение парковочных мест**: Каждое место задается многоугольником (полигоном) с определенными координатами.
3. **Анализ занятости**: Система определяет пересечение обнаруженных автомобилей с зонами парковочных мест.
4. **Обновление статуса**: Отслеживание изменений статуса парковочных мест во времени.
5. **Визуализация**: Отображение результатов на видео с цветовой индикацией (зеленый - свободно, красный - занято).

## Особенности реализации

- Устойчивость к шумам благодаря механизму последовательных проверок
- Поддержка произвольной геометрии парковочных мест
- Визуализация результатов в реальном времени
- Сохранение обработанного видео с отмеченными парковочными местами и их статусом

## Применение

Система может использоваться для автоматизированного мониторинга парковок, информирования о наличии свободных мест и сбора статистики по использованию парковочного пространства.

## Технические детали реализации

### Обнаружение объектов
- Используется легковесная модель YOLO, обученная для обнаружения транспортных средств
- Порог уверенности детекции: 0.5 (настраиваемый параметр)

### Алгоритм определения занятости
1. Для каждого парковочного места рассчитывается процент перекрытия с обнаруженными автомобилями
2. Если процент перекрытия превышает пороговое значение (по умолчанию 0.4), место считается занятым
3. Для исключения ложных срабатываний используется механизм последовательных проверок (по умолчанию 2-3 проверки)

### Производительность
- Обработка видео происходит с заданной частотой кадров (по умолчанию 5 FPS)
- При обнаружении изменения статуса парковочного места выводится уведомление
- Результаты сохраняются в виде обработанного видео с визуальной индикацией

### Параметры настройки
- `check_interval`: интервал между проверками статуса (в секундах)
- `consecutive_checks`: количество последовательных проверок для подтверждения изменения статуса
- `confidence_threshold`: порог уверенности для обнаружения транспортных средств

## Документация по методам

### __init__(self, video_source, yolo_model_path, parking_spaces_coords, check_interval=10, consecutive_checks=3, confidence_threshold=0.5)
Инициализирует систему мониторинга парковочных мест с заданными параметрами.
- `video_source` - путь к видеофайлу или ID камеры
- `yolo_model_path` - путь к файлу модели YOLO
- `parking_spaces_coords` - словарь с координатами парковочных мест
- `check_interval` - интервал проверки статуса в секундах
- `consecutive_checks` - количество последовательных проверок для подтверждения изменения статуса
- `confidence_threshold` - порог уверенности для детекции транспортных средств

### detect_objects(self, frame)
Обнаруживает транспортные средства на кадре и возвращает список объектов.
- `frame` - изображение для анализа
- Возвращает список обнаруженных транспортных средств с их характеристиками

### check_parking_spaces(self, vehicles, occupation_threshold=0.4)
Анализирует пересечения обнаруженных транспортных средств с парковочными местами.
- `vehicles` - список обнаруженных транспортных средств
- `occupation_threshold` - минимальный процент перекрытия для признания места занятым
- Возвращает словарь со статусами парковочных мест

### update_parking_status(self, current_status)
Обновляет статусы парковочных мест с учетом истории наблюдений.
- `current_status` - текущие обнаруженные статусы парковочных мест
- Возвращает список изменений статусов парковочных мест

### visualize(self, frame, parking_spaces)
Отображает на кадре парковочные места с их текущими статусами.
- `frame` - исходный кадр для визуализации
- `parking_spaces` - информация о парковочных местах
- Возвращает кадр с нанесенной визуализацией

### run(self)
Основной метод системы, запускающий обработку видеопотока.
- Обрабатывает видео с заданными параметрами
- Сохраняет результат в выходной видеофайл

In [19]:
# Пример использования:
    # Определяем координаты парковочных мест
    # Формат: {id_места: [(x1, y1), (x2, y2), (x3, y3), (x4, y4)], ...}
parking_spaces = {
    1: [(348, 73), (402, 77), (423, 119), (359, 111)],
    2: [(293, 60), (346, 70), (355, 108), (285, 100)],
    3: [(232, 49), (289, 59), (283, 107), (207, 96)],
    4: [(176, 48), (235, 52), (206, 95), (140, 89)],
    5: [(127, 43), (177, 48), (139, 88), (86, 84)],
    # Добавьте остальные парковочные места по мере необходимости
}
    
    # Инициализируем и запускаем мониторинг
monitor = ParkingSpaceMonitor(
    video_source='./vid.mp4',
    yolo_model_path='car-75e-11n.pt',
    parking_id='0',
    parking_spaces_coords=parking_spaces,
    check_interval=2,
    consecutive_checks=2,
    confidence_threshold=0.5
)
monitor.run()


Длительность видео: 242.20 секунд, FPS: 25.0

0: 512x640 6 Vehicles, 297.7ms
Speed: 34.2ms preprocess, 297.7ms inference, 1.7ms postprocess per image at shape (1, 3, 512, 640)

0: 512x640 7 Vehicles, 494.0ms
Speed: 3.1ms preprocess, 494.0ms inference, 0.9ms postprocess per image at shape (1, 3, 512, 640)
Парковочное место 1 изменило статус с 'free' на 'occupied'
Парковочное место 2 изменило статус с 'free' на 'occupied'
Парковочное место 3 изменило статус с 'free' на 'occupied'
Парковочное место 4 изменило статус с 'free' на 'occupied'
Парковочное место 5 изменило статус с 'free' на 'occupied'

0: 512x640 7 Vehicles, 515.9ms
Speed: 39.5ms preprocess, 515.9ms inference, 4.8ms postprocess per image at shape (1, 3, 512, 640)

0: 512x640 7 Vehicles, 333.8ms
Speed: 2.5ms preprocess, 333.8ms inference, 1.3ms postprocess per image at shape (1, 3, 512, 640)

0: 512x640 7 Vehicles, 270.3ms
Speed: 11.7ms preprocess, 270.3ms inference, 7.5ms postprocess per image at shape (1, 3, 512, 640)

0: 512