In [None]:
# Веса модели и различные сопустствующие функции можно взять из репо ниже
# !git clone https://github.com/paveldat/object_detection_on_video.git

# Установим необходимые библиотеки
# !pip install -r requirements.txt

In [None]:
#Библиотеки
import cv2
import numpy as np
import pandas as pd
from art import tprint
import matplotlib.pylab as plt
import requests

# Это необходимо заполнить для работы с telegram ботом
TOKEN = "Your_token"

In [None]:
import requests
import time

bot_token = TOKEN
message = "Подключаю отслеживание паркомест"

# Функция для получения обновлений от бота
def get_updates(bot_token, offset=None):
    url = f"https://api.telegram.org/bot{bot_token}/getUpdates"
    params = {'timeout': 100, 'offset': offset}
    response = requests.get(url, params=params)
    return response.json()

# Функция для отправки сообщения
def send_message(bot_token, chat_id, message):
    url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    payload = {'chat_id': chat_id, 'text': message}
    response = requests.post(url, data=payload)
    return response.json()

# Основной цикл для получения обновлений и отправки сообщения новым пользователям

print("Бот запущен...")
offset = None
known_chat_ids = set()

flag = True
while flag:
    updates = get_updates(bot_token, offset)
    
    if "result" in updates:
        for update in updates["result"]:
            offset = update["update_id"] + 1
            chat_id = update["message"]["chat"]["id"]
            
            # Проверяем, известен ли нам chat_id
            if chat_id not in known_chat_ids:
                send_message(bot_token, chat_id, message)
                known_chat_ids.add(chat_id)  # Добавляем chat_id в список отправленных
                flag = False
                
    time.sleep(1)

chat_id = next(iter(known_chat_ids))
print ("Chat id = ", chat_id)

In [None]:
# Пути до конфигурации и весов модели
path_conf = "./object_detection_on_video/Resources/yolov4-tiny.cfg"
path_weights = "./object_detection_on_video/Resources/yolov4-tiny.weights"
path_coco_names = "./object_detection_on_video/Resources/coco.names.txt"

video_path = './final_test_2.mp4'

# Словарь для отслеживания состояния каждого парковочного места
parking_states = {}  # {'parking_id': {'state': 'free'/'occupied', 'counter': int}}

# Порог стабильности (количество кадров)
STABILITY_THRESHOLD = 100

# Функции для подсчета Intersection over Union (IoU)
def calculate_iou(box, boxes, box_area, boxes_area):
    y1 = np.maximum(box[0], boxes[:, 0])
    y2 = np.minimum(box[2]+box[0], boxes[:, 2]+boxes[:, 0])
    x1 = np.maximum(box[1], boxes[:, 1])
    x2 = np.minimum(box[3]+box[1], boxes[:, 3]+boxes[:, 1])
    intersection = np.maximum(x2 - x1, 0) * np.maximum(y2 - y1, 0)
    union = box_area + boxes_area[:] - intersection[:]
    return intersection / union

def compute_overlaps(boxes1, boxes2):
    area1 = boxes1[:, 2] * boxes1[:, 3]
    area2 = boxes2[:, 2] * boxes2[:, 3]
    overlaps = np.zeros((boxes1.shape[0], boxes2.shape[0]))
    for i in range(overlaps.shape[1]):
        box2 = boxes2[i]
        overlaps[:, i] = calculate_iou(box2, boxes1, area2[i], area1)
    return overlaps

def draw_bbox(x, y, w, h, parking_text, parking_color=(0, 255, 0)):
    start = (x, y)
    end = (x + w, y + h)
    final_image = cv2.rectangle(image_to_process, start, end, parking_color, 2)
    final_image = cv2.putText(final_image, parking_text, (x, y - 10),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.4, parking_color, 1, cv2.LINE_AA)
    return final_image

def send_photo_file(chat_id, img):
    files = {'photo': open(img, 'rb')}
    requests.post(f'https://api.telegram.org/bot{TOKEN}/sendPhoto?chat_id={chat_id}', files=files)

def send_telegram_message(message):
    requests.get(f'https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={chat_id}&text={message}').json()

# Функция для проверки стабильного изменения статуса парковочного места
def check_parking_status(parking_id, is_free, screenshot):
    global parking_states
    
    # Получаем текущее состояние парковочного места и его счетчик
    current_state_info = parking_states.get(parking_id, {'state': 'occupied', 'counter': 0})
    current_state = current_state_info['state']
    counter = current_state_info['counter']
    
    # Определяем новое состояние
    new_state = 'free' if is_free else 'occupied'
    
    # Если состояние изменилось, увеличиваем счетчик
    if current_state != new_state:
        counter += 1  # Увеличиваем счетчик, так как потенциальное состояние изменилось
        # Если состояние стабильно на протяжении заданного порога, фиксируем изменение и отправляем уведомление
        if counter >= STABILITY_THRESHOLD:
            if new_state == 'free':
                message = f'Парковочное место {parking_id} теперь свободно!'
                send_telegram_message(message)
                cv2.imwrite(f'./image_test_free_{parking_id}.png', screenshot)
                send_photo_file(chat_id, f'./image_test_free_{parking_id}.png')
            else:
                message = f'Парковочное место {parking_id} теперь занято.'
                send_telegram_message(message)
                cv2.imwrite(f'./image_test_not_free_{parking_id}.png', screenshot)
                send_photo_file(chat_id, f'./image_test_not_free_{parking_id}.png')
            
            # Обновляем состояние и сбрасываем счетчик
            parking_states[parking_id] = {'state': new_state, 'counter': 0}
        else:
            # Обновляем счетчик, если состояние еще не стабильно
            parking_states[parking_id]['counter'] = counter
    else:
        # Если состояние совпадает с предыдущим, сбрасываем счетчик
        parking_states[parking_id]['counter'] = 0

net = cv2.dnn.readNetFromDarknet(path_conf, path_weights)
layer_names = net.getLayerNames()
out_layers_indexes = net.getUnconnectedOutLayers()
out_layers = [layer_names[index - 1] for index in out_layers_indexes]

first_frame_parking_spaces = None
parking_spaces = []  # для хранения координат парковочных мест

video_capture = cv2.VideoCapture(video_path)

while video_capture.isOpened():
    ret, image_to_process = video_capture.read()
    if not ret:
        break  # Завершить, если видео закончилось

    # Инициализируем `final_image` текущим кадром
    final_image = image_to_process.copy()

    height, width, _ = image_to_process.shape
    blob = cv2.dnn.blobFromImage(image_to_process, 1 / 255, (608, 608), (0, 0, 0), swapRB=True, crop=False)
    net.setInput(blob)
    outs = net.forward(out_layers)
    class_indexes, class_scores, boxes = ([] for i in range(3))

    for out in outs:
        for obj in out:
            scores = obj[5:]
            class_index = np.argmax(scores)
            if class_index == 2:  # Если класс 'car'
                class_score = scores[class_index]
                if class_score > 0:
                    center_x = int(obj[0] * width)
                    center_y = int(obj[1] * height)
                    obj_width = int(obj[2] * width)
                    obj_height = int(obj[3] * height)
                    box = [center_x - obj_width // 2, center_y - obj_height // 2, obj_width, obj_height]
                    boxes.append(box)
                    class_indexes.append(class_index)
                    class_scores.append(float(class_score))
        
    if not first_frame_parking_spaces:
        # При первом кадре определяем все парковочные места
        first_frame_parking_spaces = boxes
        parking_spaces = first_frame_parking_spaces
        # Инициализируем состояние парковочных мест как занятые
        for i in range(len(parking_spaces)):
            parking_states[i] = {'state': 'occupied', 'counter': 0}
            x, y, w, h = parking_spaces[i]
            parking_text = f'Parking {i}'
            final_image = draw_bbox(x, y, w, h, parking_text, (255, 0, 0))
    else:
        chosen_cars_boxes = cv2.dnn.NMSBoxes(boxes, class_scores, 0.0, 0.25)
        cars_area = []
        for box_index in chosen_cars_boxes:
            car_box = boxes[box_index]
            cars_area.append(car_box)
            x, y, w, h = car_box
            parking_text = 'Car'
            final_image = draw_bbox(x, y, w, h, parking_text, (255, 255, 0))
            
        cars_boxes = cars_area
        overlaps = compute_overlaps(np.array(parking_spaces), np.array(cars_boxes))
        
        for parking_id, (parking_space_one, area_overlap) in enumerate(zip(parking_spaces, overlaps)):
            max_IoU = max(area_overlap)
            is_free = max_IoU < 0.4  # Определение свободного состояния
            
            # Проверка стабильности состояния парковочного места и отправка уведомлений
            check_parking_status(parking_id, is_free, final_image)
            
            # Перерисовка боксов в соответствии со статусом парковки
            x, y, w, h = parking_space_one
            if is_free:
                parking_text = f'FREE SPACE {parking_id}'
                final_image = draw_bbox(x, y, w, h, parking_text, (0, 255, 0))
            else:
                parking_text = f'No parking {parking_id}'
                final_image = draw_bbox(x, y, w, h, parking_text, (255, 0, 0))

    cv2.imshow("Parking Space", final_image)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        video_capture.release()
        cv2.destroyAllWindows()
        break
