In [93]:
from ultralytics import YOLO
import torch
import cv2
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch import sigmoid

In [94]:
class Analyzer:
    def __init__(self, device=None):
        if device:
            self.device = torch.device(device)
        else:
            self.device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

        # подгружаем модель
        # модель детекции телефонов и похожего на телефоны
        self.model_phone_detect = YOLO("./weights/best_phone.pt") 
        # дополнительный классификатор телефон или что-то иное
        self.model_phone_clf = torch.load("./weights/model64_1.pt")
        self.model_phone_clf.eval()
        self.model_phone_clf.to("cpu")

        self.transforms = transform_base = A.Compose([ 
    A.Resize(64, 64), 
    A.Normalize(), 
    ToTensorV2() 
])

        # self.upscale_crop = upscale_crop

        
        self.colors = [
            (),
            (),
            (0, 0, 255)  # phone
        ]

        self.KPS = 1  # Target Keyframes Per Second

    def predict_video(self, video_path):

        cap = cv2.VideoCapture(video_path)
        fps = round(cap.get(cv2.CAP_PROP_FPS))
        hop = round(fps / self.KPS)
        curr_frame = 0
        curr_second = 0

        # число элементов == числу секунд в видео
        # 0 - на фрейме ничего нет, 1 - бокс предсказанный на фрейме с макс вероятностью
        # телефон
        # данные, которые надо записывать во время отработки программы
        phone_sec_lst = []

        while True:
            curr_state = 0  # для phone_sec_lst
            ret, frame = cap.read()
            if not ret: break

            # по одному заходу сюда каждую секунду
            if curr_frame % hop == 0:
                
                # пропустим через детектор
                res_detect = self.detect_phone(frame)
                if res_detect is not None and (len(res_detect["boxes"]) > 0):
                    # если детектор нашел что-то, то пропускаем через классификатор
                    clf_res = self.clf_phone(res_detect["object"])
                    if clf_res:
                        curr_state = 1

                phone_sec_lst.append(curr_state)

                curr_second += 1
            
            curr_frame += 1
        
        cap.release()

        # изображение обработалось теперь надо провести посекундный анализ
        intervals_lst = []
        time_sec_counter = 0
        phone_frame_counter = 0
        print(f"Sec: {len(phone_sec_lst)}")

        # нету ли тут ошибки???
        for item in phone_sec_lst:
            if item == 0:
                if phone_frame_counter < 3:
                    phone_frame_counter = 0
                elif phone_frame_counter == 3:
                    # записываем конец текущего интервала
                    if len(intervals_lst) > 0:
                        intervals_lst[-1].append(time_sec_counter)
                    else:
                        intervals_lst.append([time_sec_counter - 2, time_sec_counter])
                    # обнуляем счетчик
                    phone_frame_counter = 0
                else:
                    # записываем конец текущего интервала
                    intervals_lst[-1].append(time_sec_counter)
                    # обнуляем счетчик
                    phone_frame_counter = 0
            
            else:
                if phone_frame_counter < 3:
                    phone_frame_counter += 1
                elif phone_frame_counter == 3:
                    # инициализируем интервал
                    intervals_lst.append([time_sec_counter - 3])
                    phone_frame_counter += 1
                else:
                    phone_frame_counter += 1

            time_sec_counter += 1

        
        if len(intervals_lst) > 0 and len(intervals_lst[-1]) == 1:
            intervals_lst[-1].append(time_sec_counter)

        if len(intervals_lst) > 0:
            print("Было долгое использование телефона")
            intervals_str = [ 
            f"{self.sec2minutes_sec(item[0])} - {self.sec2minutes_sec(item[1])}"
                          for item in intervals_lst
                          ]

            print(f"Intervals: {intervals_str}")
        
        else:
            print("Нарушений не выявлено")

    def detect_phone(self, img: np.array):
        yolo_results = self.get_yolo_results(img)       

        # if not save is None:
            # собираем изображение и сохраняем его
            # for curr_class, curr_box in  zip(yolo_results["class_ids"], yolo_results["boxes"]):
                # (x, y, x2, y2) = curr_box
                # cv2.rectangle(img, (x, y), (x2, y2), self.colors[curr_class], 2)
            # cv2.imwrite(save, img)

        return yolo_results

    def get_yolo_results(self, tile):
        yolo_results = self.model_phone_detect.predict(source=tile, save=False, save_txt=False, verbose=False)

        height, width, channels = tile.shape

        # если ничего не путаю, возможен только один проход по циклу
        for result in yolo_results:
            if result.boxes is not None:
                # боксы
                bboxes = np.array(result.boxes.xyxyn.cpu(), dtype="float")
                bboxes[:, 0] *= width
                bboxes[:, 2] *= width
                bboxes[:, 1] *= height
                bboxes[:, 3] *= height
                bboxes = bboxes.astype(int)[:1]

                # Get class ids
                class_ids = np.array(result.boxes.cls.cpu(), dtype="int")[:1]

                # Get scores
                scores = np.array(result.boxes.conf.cpu(), dtype="float").round(2)[:1]

                # max_object - бокс с максимальным скором, который мы рассматриваем
                if len(bboxes) > 0:
                    x, y, x2, y2 = bboxes[0]
                    max_object = tile[y:y2, x:x2, :]
                else:
                    max_object = None

                return {
                    "boxes": bboxes,
                    "class_ids": class_ids,
                    "scores": scores,
                    "object": max_object
                }

        return None
    
    def clf_phone(self, img: np.array):
        
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        aug = self.transforms(image=img) 
        img = aug['image']
        

        img = img[np.newaxis, :, :, :]

        
        
        with torch.no_grad(): 
            outputs = res = self.model_phone_clf(img.float())
            probs = torch.sigmoid(outputs) 

            res = probs.to(self.device).item() > 0.6
            

        return res
    
    def sec2minutes_sec(self, sec):
        minutes = sec // 60
        curr_sec = sec % 60
        return f"{minutes}:{curr_sec}"


In [95]:
a = Analyzer()

In [102]:
a.predict_video("9.mp4")

Sec: 601
Нарушений не выявлено
