In [1]:
import os, sys
import cv2
import numpy as np
from PIL import Image
from ultralytics import YOLO
from openpifpaf import predict
from typing import List, Tuple, Dict, Any
import torch
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import math

import logging

# Устанавливаем уровень логирования на ERROR, чтобы не выводились информационные сообщения
logging.getLogger().setLevel(logging.ERROR)

In [2]:
class YoloDetector:
    def __init__(self, model_path='yolov8n.pt'):
        self.model = YOLO(model_path, verbose=False)  # Используется YOLOv8

    def detect_and_crop(self, image, annotations, image_size):
        results = self.model(image)
        boxes = results[0].boxes
        people_images = []

        for i in range(len(boxes)):
            cls = int(boxes.cls[i].item())
            if cls == 0:  # человек
                x1, y1, x2, y2 = boxes.xyxy[i].tolist()
                x1, y1 = int(max(x1, 0)), int(max(y1, 0))
                x2, y2 = int(min(x2, image_size[1])), int(min(y2, image_size[0]))

                matched_class = None
                for class_id, xc, yc, w, h in annotations:
                    abs_xc = xc * image_size[1]
                    abs_yc = yc * image_size[0]

                    if x1 <= abs_xc <= x2 and y1 <= abs_yc <= y2:
                        matched_class = class_id
                        break

                if matched_class is not None:
                    cropped = image[y1:y2, x1:x2]
                    bbox = [x1, y1, x2, y2]
                    people_images.append((cropped, bbox, matched_class))

        return people_images


def read_annotations(label_path):
    annotations = []
    if os.path.exists(label_path):
        with open(label_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) == 5:
                    class_id = int(parts[0])
                    x_center, y_center, w, h = map(float, parts[1:])
                    annotations.append((class_id, x_center, y_center, w, h))
    return annotations

def detect_keypoints_openpifpaf(image):
    """Detect human keypoints and return list of points (x, y, confidence)"""
    try:
        # Convert to PIL Image format
        image_pil = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        
        # Initialize predictor
        predictor = predict.Predictor(checkpoint='shufflenetv2k30')
        
        # Get predictions - обработка разных форматов возврата
        predictions = predictor.pil_image(image_pil)
        
        # Для разных версий OpenPifPaf
        if isinstance(predictions, tuple) and len(predictions) == 3:
            # Новые версии возвращают (predictions, _, _)
            predictions = predictions[0]
        
        keypoints_list = []
        
        if predictions:
            for pred in predictions:
                try:
                    # Способ 1: через атрибут .data (новые версии)
                    if hasattr(pred, 'data'):
                        kps = pred.data
                        points = [(float(kp[0]), float(kp[1]), float(kp[2])) for kp in kps]
                    # Способ 2: через атрибут .keypoints (старые версии)
                    elif hasattr(pred, 'keypoints'):
                        kps = pred.keypoints
                        points = [(float(kp[0]), float(kp[1]), float(kp[2])) for kp in kps]
                    else:
                        continue
                    
                    if len(points) == 17:  # Проверяем что получили 17 точек COCO
                        keypoints_list.append(points)
                except Exception as e:
                    print(f"Error processing prediction: {e}")
                    continue
        
        return keypoints_list
    
    except Exception as e:
        print(f"Keypoint detection failed: {e}")
        return []


In [3]:
def normalize_keypoints_relative_to_bbox(keypoints, bbox):
    """
    Нормализует ключевые точки относительно размера bbox.
    
    :param keypoints: Список из 17 ключевых точек (x, y, confidence).
    :param bbox: Bbox вида [x1, y1, x2, y2] для нормализации.
    
    :return: Список нормализованных ключевых точек.
    """
    x1, y1, x2, y2 = bbox
    bbox_width = x2 - x1
    bbox_height = y2 - y1

    norm_keypoints = []
    
    for (x, y, c) in keypoints:
        if x < 0 or y < 0 or c == 0:
            norm_x = 0
            norm_y = 0
        else:
            norm_x = x / bbox_width
            norm_y = y / bbox_height
        norm_keypoints.append([norm_x, norm_y, c])

    return norm_keypoints


In [4]:
class KeypointsDataset(Dataset):
    def __init__(self, images_dir, labels_dir, transform=None):
        self.images_dir = images_dir
        self.labels_dir = labels_dir
        self.image_files = [f for f in os.listdir(images_dir) if f.endswith('.jpg') or f.endswith('.png')]
        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Загружаем изображение
        image_name = self.image_files[idx]
        image_path = os.path.join(self.images_dir, image_name)
        image = cv2.imread(image_path)
        image_size = image.shape[:2]  # (height, width)

        # Чтение аннотаций
        label_path = os.path.join(self.labels_dir, image_name.replace('.jpg', '.txt').replace('.png', '.txt'))
        annotations = read_annotations(label_path)

        # Детекция и обрезка людей
        yolo_detector = YoloDetector('yolov8n.pt')
        people_images = yolo_detector.detect_and_crop(image, annotations, image_size)

        # Для каждого человека, нормализуем keypoints
        dataset_entry = []
        for cropped_image, bbox, class_id in people_images:
            keypoints = detect_keypoints_openpifpaf(cropped_image)
            if keypoints:  # Если есть ключевые точки
                norm_keypoints = normalize_keypoints_relative_to_bbox(keypoints[0], bbox)  # Нормализуем только первые 17 точек
                dataset_entry.append({
                    'image': image,
                    'class_id': class_id,
                    'bbox': bbox,
                    'keypoints': norm_keypoints
                })

        # Применяем преобразования (если нужны)
        if self.transform:
            # Тут можно добавить преобразования для изображения (например, преобразование в тензор)
            image = self.transform(image)

        return dataset_entry

In [5]:
images_dir = 'dataset/images'
labels_dir = 'dataset/labels'
dataset = KeypointsDataset(images_dir, labels_dir)

In [6]:

# Разделение на обучающий и тестовый датасеты
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

print(f'Обучающий набор: {len(train_dataset)} изображений')
print(f'Тестовый набор: {len(test_dataset)} изображений')


Обучающий набор: 490 изображений
Тестовый набор: 123 изображений


In [7]:
def prepare_data_for_transformer(dataset):
    transformer_input = []
    target_labels = []  # Сюда собираем целевые метки class_id (курит/не курит/пауза)
    
    for sample in dataset:
        for person in sample:
            # Получаем данные
            class_id = person['class_id']  # Это целевая метка
            bbox = np.array(person['bbox'], dtype=np.float32)  # Преобразуем в тензор
            keypoints = np.array(person['keypoints'], dtype=np.float32).flatten()  # Преобразуем в 1D массив
            
            # Строим последовательность признаков для трансформера
            input_sequence = np.concatenate([ 
                bbox,                 # bbox (4 признака)
                keypoints             # keypoints (34 признака)
            ])
            
            transformer_input.append(torch.tensor(input_sequence))
            target_labels.append(torch.tensor(class_id))  # Класс (0 - не курит, 1 - пауза, 2 - курит)
    
    return transformer_input, target_labels

In [8]:

class TransformerModel(nn.Module):
    def __init__(self, feature_dim, num_classes):
        super(TransformerModel, self).__init__()
        
        # Трансформер для обработки последовательности признаков
        self.transformer = nn.Transformer(
            d_model=feature_dim,  # Размерность признаков (bbox + keypoints)
            nhead=11,  # Количество голов в multihead attention
            num_encoder_layers=6,  # Количество слоев энкодера
            num_decoder_layers=6,  # Количество слоев декодера
            batch_first=True  # Указываем, что данные будут в формате (batch_size, seq_len, feature_dim)
        )
        
        # Классификатор для предсказания класса
        self.fc = nn.Linear(feature_dim, num_classes)  # 3 класса (не курит, пауза, курит)

    def forward(self, x):
        # Прогоняем через трансформер
        x = self.transformer(x, x)  # (src, tgt)
        
        # Извлекаем только последний токен для классификации
        x = x[:, -1, :]
        
        # Прогоняем через классификатор
        out = self.fc(x)
        return out

In [9]:
train_inputs, train_labels = prepare_data_for_transformer(train_dataset)
test_inputs, test_labels = prepare_data_for_transformer(test_dataset)

train_inputs = torch.stack(train_inputs)  # (N, 55)
train_labels = torch.tensor(train_labels)
test_inputs = torch.stack(test_inputs)
test_labels = torch.tensor(test_labels)

train_inputs = train_inputs.unsqueeze(1)  # (N, 1, 55)
test_inputs = test_inputs.unsqueeze(1)

# Создаем датасеты и лоадеры
train_loader = DataLoader(TensorDataset(train_inputs, train_labels), batch_size=64, shuffle=True)
test_loader = DataLoader(TensorDataset(test_inputs, test_labels), batch_size=64, shuffle=False)


0: 384x640 4 persons, 4 cars, 19.6ms
Speed: 6.4ms preprocess, 19.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 7 cars, 8.6ms
Speed: 1.1ms preprocess, 8.6ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 4 cars, 8.6ms
Speed: 1.0ms preprocess, 8.6ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 persons, 4 cars, 8.8ms
Speed: 1.1ms preprocess, 8.8ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 persons, 7 cars, 8.5ms
Speed: 1.0ms preprocess, 8.5ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 persons, 4 cars, 8.8ms
Speed: 1.0ms preprocess, 8.8ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6 cars, 8.7ms
Speed: 1.0ms preprocess, 8.7ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 6 cars, 8.7ms
Speed: 1.0ms preprocess, 8.7ms i

In [17]:


model = TransformerModel(feature_dim=55, num_classes=3)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 100

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        optimizer.zero_grad()
        outputs = model(batch_x)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

model.eval()
correct = 0
total = 0

with torch.no_grad():
    for batch_x, batch_y in test_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model(batch_x)
        _, predicted = torch.max(outputs, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

print(f"Точность на тесте: {100 * correct / total:.2f}%")

Epoch 1/100, Loss: 0.7560
Epoch 2/100, Loss: 0.6534
Epoch 3/100, Loss: 0.6440
Epoch 4/100, Loss: 0.6294
Epoch 5/100, Loss: 0.6064
Epoch 6/100, Loss: 0.6040
Epoch 7/100, Loss: 0.5909
Epoch 8/100, Loss: 0.5683
Epoch 9/100, Loss: 0.5538
Epoch 10/100, Loss: 0.5408
Epoch 11/100, Loss: 0.5520
Epoch 12/100, Loss: 0.5452
Epoch 13/100, Loss: 0.5494
Epoch 14/100, Loss: 0.5365
Epoch 15/100, Loss: 0.5323
Epoch 16/100, Loss: 0.5171
Epoch 17/100, Loss: 0.5211
Epoch 18/100, Loss: 0.5379
Epoch 19/100, Loss: 0.5098
Epoch 20/100, Loss: 0.5160
Epoch 21/100, Loss: 0.5225
Epoch 22/100, Loss: 0.5275
Epoch 23/100, Loss: 0.5174
Epoch 24/100, Loss: 0.5043
Epoch 25/100, Loss: 0.5144
Epoch 26/100, Loss: 0.4968
Epoch 27/100, Loss: 0.5200
Epoch 28/100, Loss: 0.5025
Epoch 29/100, Loss: 0.5037
Epoch 30/100, Loss: 0.4914
Epoch 31/100, Loss: 0.5084
Epoch 32/100, Loss: 0.5052
Epoch 33/100, Loss: 0.5041
Epoch 34/100, Loss: 0.4883
Epoch 35/100, Loss: 0.4852
Epoch 36/100, Loss: 0.4994
Epoch 37/100, Loss: 0.4951
Epoch 38/1

In [19]:
# Цвета для классов
CLASS_COLORS = {
    0: (0, 255, 0),    # Зеленый
    1: (0, 165, 255),  # Оранжевый (BGR)
    2: (0, 0, 255),    # Красный
}

CLASS_NAMES = {
    0: 'dont',
    1: 'pause',
    2: 'smoke',
}

def visualize_predictions(dataset, model, device='cpu', save_path=None):
    model.eval()
    model.to(device)

    for i, sample in enumerate(dataset):
        if not sample:
            print(f"[{i}] Пропущено: пустое изображение или нет людей")
            continue

        image = None

        for person in sample:
            if image is None:
                if 'image' not in person or person['image'] is None:
                    print(f"[{i}] Ошибка: нет изображения в sample")
                    continue
                image = person['image'].copy()

            bbox = person['bbox']
            keypoints = np.array(person['keypoints'], dtype=np.float32).flatten()
            input_tensor = np.concatenate([bbox, keypoints])
            input_tensor = torch.tensor(input_tensor, dtype=torch.float32).unsqueeze(0).unsqueeze(1).to(device)

            with torch.no_grad():
                output = model(input_tensor)
                pred_class = output.argmax(dim=1).item()

            # Цвет и подпись
            color = CLASS_COLORS.get(pred_class, (255, 255, 255))
            label = CLASS_NAMES.get(pred_class, str(pred_class))

            x1, y1, x2, y2 = map(int, bbox)
            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
            cv2.putText(image, label, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        if image is not None:
            if save_path:
                cv2.imwrite(f"{save_path}/result_{i}.jpg", image)
            else:
                cv2.imshow("Prediction", image)
                cv2.waitKey(0)
        else:
            print(f"[{i}] Не удалось визуализировать — image is None")

    cv2.destroyAllWindows()


In [20]:
visualize_predictions(test_dataset, model, device='cuda' if torch.cuda.is_available() else 'cpu')



0: 384x640 1 person, 7 cars, 21.9ms
Speed: 2.1ms preprocess, 21.9ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 persons, 7 cars, 27.6ms
Speed: 1.4ms preprocess, 27.6ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
 buffer
src/openpifpaf/csrc/src/occupancy.cpp:53: UserInfo: resizing occupancy buffer
src/openpifpaf/csrc/src/cif_hr.cpp:102: UserInfo: resizing cifhr buffer
src/openpifpaf/csrc/src/occupancy.cpp:53: UserInfo: resizing occupancy buffer
src/openpifpaf/csrc/src/cif_hr.cpp:102: UserInfo: resizing cifhr buffer
src/openpifpaf/csrc/src/occupancy.cpp:53: UserInfo: resizing occupancy buffer
src/openpifpaf/csrc/src/cif_hr.cpp:102: UserInfo: resizing cifhr buffer
src/openpifpaf/csrc/src/occupancy.cpp:53: UserInfo: resizing occupancy buffer
src/openpifpaf/csrc/src/cif_hr.cpp:102: UserInfo: resizing cifhr buffer
src/openpifpaf/csrc/src/occupancy.cpp:53: UserInfo: resizing occupancy buffer
src/openpifpaf/csrc/src/cif_hr.cpp:102: UserI