In [1]:
# Эти два импорта используются для работы с изображениями,
# в том числе загрузкой готовых датасетов, их трансформированием и подготовкой к обучению модели.

from torchvision import datasets
import torchvision.transforms as transforms

In [4]:
# Подготовка датасета для обучения модели.
# - Загружаем датасет CelebA - это большой датасет для обучения в области распознавания лиц.
# - split="train" - выбираем часть датасета, которая содержит тренировочные данные.
# - transforms.Compose - определяем последовательность преобразований, которые должны быть применены к изображениям перед их использованием в обучении
#    - transforms.CenterCrop(128): это функция, которая обрезает центральную часть изображения до размера 128x128 пикселей.
#           Все изображения должны иметь один и тот же размер перед подачей их на вход нейросети
#    - transforms.ToTensor(): это преобразование изображений в формат torch.Tensor и нормализация значения пикселей к диапазону [0, 1]. (p / 255)
#    - transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]): это преобразование нормализует каждый канал изображения (Red, Green, Blue),
#           отняв среднее (0.5) и поделив на стандартное отклонение (0.5). Это делается для упрощения процесса обучения.

train_dataset = datasets.CelebA(
    root="/Users/martha-ezer/Desktop/dataset2",
    split="train",
    transform=transforms.Compose(
        [
            transforms.CenterCrop(128),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
        ]
    ),
    download=False,
)

In [6]:
# Этот блок кода аналогичен прошлому, но в этот раз он загружает тестовый набор данных (вместо обучающего набора данных) из датасета CelebA.

test_dataset = datasets.CelebA(
    root="/Users/martha-ezer/Desktop/dataset2",
    split="test",
    transform=transforms.Compose(
        [
            transforms.CenterCrop(128),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
        ]
    ),
    download=False,
)

In [7]:
# Создаем два объекта DataLoader для обучающего и тестового набора данных соответственно,
# используются в процессе обучения и тестирования модели для итерации по датасетам.

# 1. dataset: это датасет, который будет использоваться. Здесь мы используем train_dataset для обучения и test_dataset для тестирования.
# 2. shuffle: перемешивание данных в произвольном порядке при каждой новой эпохе обучения.
#       Делается для обучающего набора данных для обеспечения лучшего обучения модели.
#       Для тестового набора данных используется shuffle=False, потому что порядок данных не важен при тестировании.
# 3. batch_size: это количество образцов, которые будут пропущены через модель за одну итерацию.
# 4. num_workers: это количество подпроцессов, используемых для загрузки данных.

import torch

train_dataloader = torch.utils.data.DataLoader(
    dataset=train_dataset, shuffle=True, batch_size=128, num_workers=4
)
test_dataloader = torch.utils.data.DataLoader(
    dataset=test_dataset, shuffle=False, batch_size=128, num_workers=4
)

In [8]:
import torch.nn as nn
import torch.nn.functional as F

# slimnet 2017 модель
# Далее определяется архитектура нейронной сети для задачи классификации.
# Есть несколько определенных слоев:
# 1. ConvBNReLU: Это простой слой, который объединяет
#       свертку (Conv), нормализацию по пакету (BN - batch normalization) и функцию активации ReLU в один модуль.
# 2. DWSeparableConv: Это слой глубокой свертки (depthwise separable convolution). Это более эффективный тип свертки.
# 3. SSEBlock: Это слой, который использует предыдущие два для создания более сложной структуры.
# 4. SlimModule: Это слой, который использует вышеописанные элементы для создания еще более сложного блока.
# В SlimNet вся эта структура используется для создания конечной модели нейронной сети.
# И создается экземпляр модели с 40 выходными классами и переносим ее на процессор (CPU).

In [10]:
# Класс ConvBNReLU является подклассом nn.Sequential (объектом из пакета PyTorch, представляющим последовательность слоев).
#   Он позволяет делать прямой проход через все слои последовательно.
#   Этот класс представляет собой конвейер из трех слоев:
#       сверточного слоя nn.Conv2d,
#       слоя нормализации по батчу nn.BatchNorm2d и
#       функции активации ReLU nn.ReLU.
# ConvBNReLU - это обертка для группировки трех часто используемых вместе операций:
#   свертки, батч-нормализации и функции активации ReLU. Это упрощает код и повышает его читаемость.
class ConvBNReLU(nn.Sequential):
    # in_planes - число входных каналов для сверточного слоя
    # out_planes - число выходных каналов для сверточного слоя
    # kernel_size - размер свертки - определяет размер "окна", которое движется по входному изображению.
    #       Значение "3" означает, что используется окно размером 3x3 пиксела
    #       Это наиболее распространенное значение для сверточных слоев, так как оно подходит для выявления большинства типичных паттернов
    #       на изображениях, но не слишком велико, чтобы стать причиной переобучения
    # stride - шаг свертки - определяет, во сколько пикселей смещается окно свертки после каждого вычисления.
    #       Значение "1" означает, что окно смещается на один пиксель на каждом шаге
    # groups - количество групп, на которые разделены входные данные
    #       Значение "1" означает, что все входные каналы обрабатываются вместе, что является стандартной конфигурацией сверточного слоя.
    #       Если бы groups было равно 2, входные каналы были бы разделены на две группы, и для каждой группы вычислялись бы свои свертки.
    #       Это обычно используется для более сложных архитектур.
    def __init__(self, in_planes, out_planes, kernel_size=3, stride=1, groups=1):
        # padding = (kernel_size - 1) // 2 - Вычисление количества "отступа" (padding),
        # которое должно быть добавлено к изображению при применении свертки так, чтобы размеры изображения оставались неизменными.
        padding = (kernel_size - 1) // 2
        # Аргументы передаваемые в super().init(...) - это слои, которые должны быть добавлены в наш Sequential объект
        super(ConvBNReLU, self).__init__(
            # Это сверточный слой. nn.Conv2d применяет двумерную свертку к входному изображению.
            nn.Conv2d(
                in_planes,
                out_planes,
                kernel_size,
                stride,
                padding,
                groups=groups,
                bias=False,
            ),
            # Это слой батч-нормализации. nn.BatchNorm2d нормализует выходное изображение слоя свёртки так,
            # чтобы у каждого был нулевой средний и единичное стандартное отклонение.
            # Используется для ускорения обучения и повышения его стабильности.
            nn.BatchNorm2d(out_planes),
            # Это функция активации ReLU. nn.ReLU() применяет функцию активации ReLU к нормализованному изображению.
            nn.ReLU(inplace=True),
        )

In [11]:
# Класс DWSeparableConv - глубокий разделяемый сверточный слой (Depthwise Separable Convolutions).
# Используется для упрощения и ускорения работы сети за счет разделения свертки на две части:
#   глубинную свертку, обрабатывающей каждый канал отдельно (depthwise convolution) и
#   свертку с равномерными весами, комбинирующей результаты в нужное количество каналов (pointwise convolution).
# Наследуется от nn.Module - это основной класс в PyTorch для всех нейронных сетей и слоев.
class DWSeparableConv(nn.Module):
    # inp - число входных каналов
    # out - число выходных каналов
    def __init__(self, inp, out):
        super().__init__()
        # Создание глубинного сверточного слоя, где каждый входной канал обрабатывается отдельно.
        # Другими словами, здесь inp групп сверток по одной на каждый входной канал. В результате размерность выхода остается той же, что и входа.
        self.dwc = ConvBNReLU(inp, inp, kernel_size=3, groups=inp)
        # Создание сверточного слоя с равномерными весами.
        # Этот слой использует свертку 1x1, чтобы объединить выходы глубинного сверточного слоя в out каналов.
        self.pwc = ConvBNReLU(inp, out, kernel_size=1)

    # Функция, описывающая, как данные пропускаются через модель.
    # В данном случае, она определяет, что вход x последовательно проходит через глубинную свертку и свертку с равномерными весами.
    def forward(self, x):
        # Применение глубинной свертки (dwc) и свертки с равномерными весами (pwc) к входным данным.
        x = self.dwc(x)
        x = self.pwc(x)
        # Результат после прогона входных данных через оба слоя
        return x

In [None]:
# Этот код определяет класс SSEBlock, который является архитектурой блока нейронной сети, использующего
#   свертки (Conv), батч-нормализацию (BN) и функции активации ReLU, а также глубинно разделяемые свертки (Depthwise Separable Convolution).
# В контексте полной модели, SSEBlock представляет собой один из составных блоков этой модели,
#   который преобразует входные данные и генерирует промежуточные выходные данные для следующих слоев или блоков модели.
# Как правило, эти конструкции помогают упростить процесс обучения и упорядочить структуру модели.
class SSEBlock(nn.Module):
    # inp - число входных каналов
    # out - число выходных каналов
    def __init__(self, inp, out):
        super().__init__()
        # определяется переменная out_channel, назначенная числом, равным четырёмкратному количеству выходных каналов (oup).
        # Такой подход применяется для обеспечения большего количества признаковых карт (feature maps) на выходе этих сверточных слоев.
        out_channel = out * 4
        # определение двух прямых сверточных слоев (pwc1 и pwc2) с батч-нормализацией и функцией активации ReLU.
        self.pwc1 = ConvBNReLU(inp, out, kernel_size=1)
        self.pwc2 = ConvBNReLU(out, out_channel, kernel_size=1)
        # создание глубинного разделяемого сверточного слоя, который по сути является двумя сверточными слоями, работающими последовательно.
        self.dwc = DWSeparableConv(out, out_channel)

    # метод, определяющий прямой проход данных через нейросеть.
    def forward(self, x):
        # применение первого сверточного слоя pwc1 к x.
        x = self.pwc1(x)
        # применение второго сверточного слоя pwc2 и глубинно разделяемого сверточного слоя dwc к результату предыдущего слоя.
        out1 = self.pwc2(x)
        out2 = self.dwc(x)
        # склеивание выходов двух последних слоев вдоль второй оси (axis=1) и возвращение результата в качестве выходных данных блока.
        return torch.cat((out1, out2), 1)

In [12]:
# SlimModule, который является архитектурой блока нейронной сети, использующего
#   SSEBlock, глубинно разделяемые свертки (DWSeparableConv), и свёртку с батч-нормализацией и функцией активации ReLU (ConvBNReLU).
# SlimModule - это блок, составленный из других блоков и слоев для обучения сложных признаков из входных данных.
# Сперва данные пропускаются через SSEBlock, затем складываются с выходом простого свёрточного слоя, снова пропускаются через SSEBlock и,
#   в конце, через глубинно разделяемый сверточный слой.
# Эта последовательность операций обеспечивает значительную способность обучения сложным признакам.
# Данные складываются поэлементно: out += self.conv(x) - это пример операции "остатка" (residual operation),
#   которая обычно помогает улучшить обратное распространение градиента и обеспечивает более стабильное обучение.
class SlimModule(nn.Module):
    # inp - число входных каналов
    # oup - число выходных каналов
    def __init__(self, inp, oup):
        super().__init__()
        # Это количества скрытых и выходных каналов, соответственно, для последующих слоев.
        # Значения выбраны в четыре и три раза больше исходного количества каналов.
        hidden_dim = oup * 4
        out_channel = oup * 3
        # определение двух блоков SSEBlock, которые представляют собой своего рода мини-нейронные сети, дополняющие этот модуль.
        self.sse1 = SSEBlock(inp, oup)
        self.sse2 = SSEBlock(hidden_dim * 2, oup)
        self.dwc = DWSeparableConv(hidden_dim * 2, out_channel)
        self.conv = ConvBNReLU(inp, hidden_dim * 2, kernel_size=1)

    def forward(self, x):
        # пропускание входных данных через первый SSEBlock, слой ConvBNReLU, второй SSEBlock и DWSeparableConv соответственно.
        out = self.sse1(x)
        out += self.conv(x)
        out = self.sse2(out)
        out = self.dwc(out)
        return out

In [None]:
# Класс SlimNet определяет архитектуру сложной нейронной сети с использованием внутри составляющих компонентов.
# Данная архитектура сети предназначена для работы со сложными изображениями,
#   извлечения признаков с помощью SlimModule,
#   сокращения размерности сокращение размерности данных с помощью макс-пулинга и
#   глобального усреднения пулинга.
# Это не только упрощает обработку данных, но и избавляет от избыточных пространственных информаций, сокращая риск переобучения.
# После слоя глобального пулинга выходные данные становятся наиболее информативными изображениями в виде одномерных тензоров,
#   содержащих существенную информацию о каждом изображении, который затем передается в полносвязный слой (self.fc).
# Полносвязный слой действует как классификатор, который использует признаки, выделенные предыдущими слоями,
#   для получения окончательных предсказаний изображений по классам.
class SlimNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # создание сверточного слоя с батчем-нормализацией и активацией ReLU.
        # У данного слоя 3 входных канала (RGB), 96 выходных каналов, размер ядра 7x7 и шаг (stride) равный 2.
        self.conv = ConvBNReLU(3, 96, kernel_size=7, stride=2)
        # создание слоя пулинга (pooling), который применяется после сверточного слоя для уменьшения размерности данных.
        # Это делает модель более устойчивой к изменениям входных данных. В данном случае используется макс-пулинг с размером ядра 3x3 и шагом 2.
        self.max_pool0 = nn.MaxPool2d(kernel_size=3, stride=2)

        # Создание нескольких SlimModule и следующих за ними слоев макс-пулинга:
        #   Эти модули являются кастомными блоками нейросети, определенные ранее.
        #   Они обрабатывают входящие данные и увеличивают сложность модели.
        #   Затем за каждым модулем следует слой макс-пулинга для уменьшения размерности.
        self.module1 = SlimModule(96, 16)
        self.module2 = SlimModule(48, 32)
        self.module3 = SlimModule(96, 48)
        self.module4 = SlimModule(144, 64)

        self.max_pool1 = nn.MaxPool2d(kernel_size=3, stride=2)
        self.max_pool2 = nn.MaxPool2d(kernel_size=3, stride=2)
        self.max_pool3 = nn.MaxPool2d(kernel_size=3, stride=2)
        self.max_pool4 = nn.MaxPool2d(kernel_size=3, stride=2)
        # создание слоя глобального пулинга, который сжимает размерность (высоту и ширину) каждого входного канала до 1х1.
        self.gap = nn.AdaptiveAvgPool2d((1, 1))
        # создание полносвязного слоя с 192 входами и выходами, кол-во которых равно числу классов.
        self.fc = nn.Linear(192, num_classes)

    def forward(self, x):
        # Затем входные данные отправляются в SlimModule. За каждым из них следует слой макс-пулинга.
        # Это решение помогает уменьшить размерность признаковых карт и уменьшить переобучение.
        x = self.max_pool0(self.conv(x))
        x = self.max_pool1(self.module1(x))
        x = self.max_pool2(self.module2(x))
        x = self.max_pool3(self.module3(x))
        x = self.max_pool4(self.module4(x))
        # применяется глобальное усреднение пулинга
        x = self.gap(x)
        # входные данные "расплющиваются", то есть преобразуются в одномерный тензор.
        x = torch.flatten(x, 1)
        # применяется полносвязный слой
        x = self.fc(x)
        return x

In [None]:
# Инициализация объекта device, указывающего, на каком устройстве будет выполняться вычисления — на ЦПУ (CPU).
device = torch.device("cpu")
# Создание экземпляра модели SlimNet для задачи классификации на 40 классов. Применение метода .to(device=device) гарантирует, что все вычисления данной модели будут осуществляться на устройстве, заданном в device, то есть на ЦПУ. Это означает, что все тензоры и параметры модели будут храниться и обрабатываться на ЦПУ.
model = SlimNet(num_classes=40).to(device=device)
# При выполнении этого кода, важно убедиться, что у вас достаточно мощный ЦПУ для осуществления всех вычислений, поскольку сложные нейронные сети могут потребовать большого количества вычислительных ресурсов. Если есть доступ к ГПУ, рекомендуется использовать их для значительного ускорения процесса обучения и выполнения. Вместо "cpu" в коде выше вы можете указать "cuda" для выполнения на ГПУ (при наличии).

In [12]:
# Здесь выполняется обучение модели для задачи классификации.


# функция потерь. BCEWithLogitsLoss() соответствует Binary Cross Entropy Loss, который используется для бинарных задач классификации.
loss_criterion = nn.BCEWithLogitsLoss()
# оптимизатор. Он использует метод Adam для обновления параметров модели на основе вычисленных градиентов.
#   lr это скорость обучения, которая контролирует размер шага в каждом обновлении.
# В отличие от методов, таких как градиентный спуск или RMSprop, Adam более устойчив к шуму, пропускам и другим нерегулярностям в данных.
#   Это помогает в обучении на данных лиц, которые могут быть неоднородными и содержать различного рода вариации.
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
best_acc = 0.90325
seed = 18203861252700
# Обучение длится 50 "эпох". Эпоха это один проход через весь набор данных.
for epoch in range(50):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
    total_train = 0  # Общее количество обучающих примеров
    correct_train = 0  # Число правильно классифицированных обучающих примеров
    running_loss = 0  # Накопленная ошибка обучения
    running_test_loss = 0  # Накопленная ошибка на тестовых данных
    total_test = 0  # Oбщее количество тестовых примеров
    correct_test = 0  # Число правильно классифицированных примеров на тестовом наборе
    # устанавливаем модель в режим обучения
    model.train()
    # Проходим через обучающие данные в нашем DataLoader'е
    for data, target in train_dataloader:
        data = data.to(device=device)
        target = target.type(torch.DoubleTensor).to(device=device)
        # Пропускаем каждый пакет данных через модель
        score = model(data)
        # вычисляем потерю
        loss = loss_criterion(score, target)
        running_loss += loss.item()
        # сбрасываем градиенты
        optimizer.zero_grad()
        # затем обратно распространяем ошибку
        loss.backward()
        # делаем шаг оптимизации
        optimizer.step()
        sigmoid_logits = torch.sigmoid(score)
        predictions = sigmoid_logits > 0.5
        total_train += target.size(0) * target.size(1)
        correct_train += (target.type(predictions.type()) == predictions).sum().item()
    model.eval()
    with torch.no_grad():
        # проходим через test_dataloader, делаем предсказания для каждого пакета и считаем потери и общую точность
        for batch_idx, (images, labels) in enumerate(test_dataloader):
            images, labels = images.to(device), labels.type(torch.DoubleTensor).to(
                device
            )
            logits = model.forward(images)
            test_loss = loss_criterion(logits, labels)
            running_test_loss += test_loss.item()
            sigmoid_logits = torch.sigmoid(logits)
            # Вычисляем прогнозы для нашего входного батча (predictions = sigmoid_logits > 0.5) и
            # подсчитываем количество верно классифицированных примеров.
            predictions = sigmoid_logits > 0.5
            total_test += labels.size(0) * labels.size(1)
            correct_test += (labels.int() == predictions.int()).sum().item()
    test_acc = correct_test / total_test
    # Если точность тестирования этой эпохи лучше предыдущей лучшей, мы сохраняем модель
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(model, f"model_{test_acc*100}.pt")
        print(
            f"For epoch : { epoch} training loss: {running_loss/len(train_dataloader)}"
        )
        print(f"train accruacy is {correct_train*100/total_train}%")
        print(
            f"For epoch : {epoch} test loss: {running_test_loss/len(test_dataloader)}"
        )
        print(f"test accruacy is {test_acc*100}%")

For epoch : 9 training loss: 0.1926736063518541
train accruacy is 91.5143914726301%
For epoch : 9 test loss: 0.21591500222027996
test accruacy is 90.34715960324617%
For epoch : 10 training loss: 0.1902523389572572
train accruacy is 91.6247004976347%
For epoch : 10 test loss: 0.21585689487813894
test accruacy is 90.39938382927562%


KeyboardInterrupt: 

In [17]:
# Экспортируем модель PyTorch в формат ONNX (Open Neural Network Exchange) и затем проверяет экспортированную модель с использованием ONNX Runtime.
# 1. torch.load: Загружает модель PyTorch с помощью функции torch.load. В этом примере модель подгружается из файла model_90.39938382927562.pt.
# 2. x = torch.randn(1, 3, 128, 128, requires_grad=True): Создается случайный тензор для использования в тестировании модели.
# 3. torch.onnx.export: Экспортирует модель PyTorch в формат ONNX. Параметры включают модель, входные данные, имя файла выходного файла и другие настройки, такие как версию ONNX и имена входных и выходных данных.
# 4. onnxruntime.InferenceSession: Создает сессию для запуска обученной модели с использованием ONNX Runtime.
# 5. to_numpy: Вспомогательная функция для преобразования тензора PyTorch в массив NumPy.
# 6. ort_outs = ort_session.run(None, ort_inputs): Вызывает экспортированную модель с входными данными и заполняет выходные данные.
# 7. np.testing.assert_allclose: Сравнивает выходные данные от экспортированной модели и оригинальной модели PyTorch, для проверки того что они близки по значениям (с небольшим зазором).
# Перевод моделей из одного формата в другой дает возможность развертывать свои модели в различных средах и на разной аппаратуре.

import onnxruntime
import numpy as np

torch_model = torch.load("model_90.39938382927562.pt", map_location="cpu")
torch_model.eval()
x = torch.randn(1, 3, 128, 128, requires_grad=True)
torch_out = torch_model(x)
print(torch_out)
torch.onnx.export(
    torch_model,
    x,
    "cpu.onnx",
    export_params=True,
    opset_version=10,
    do_constant_folding=True,
    input_names=["input"],
    output_names=["output"],
)
ort_session = onnxruntime.InferenceSession(
    "cpu.onnx", providers=["CPUExecutionProvider"]
)


def to_numpy(tensor):
    return (
        tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()
    )


ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(x)}
ort_outs = ort_session.run(None, ort_inputs)
print(ort_outs[0])
np.testing.assert_allclose(to_numpy(torch_out), ort_outs[0], rtol=1e-03, atol=1e-05)

print("Exported model has been tested with ONNXRuntime, and the result looks good!")

tensor([[-4.9256e+00, -5.7584e+00, -5.0295e+00, -3.4966e+00, -7.4958e+00,
         -4.9172e+00, -2.6805e+00, -3.0474e+00, -4.1944e+00, -4.8382e+00,
          1.1892e-01, -5.0804e+00, -7.2426e+00, -3.9917e+00, -5.5468e+00,
         -4.0909e+00, -4.9752e+00, -3.5909e+00, -5.4830e+00, -4.3440e+00,
          3.8656e-01, -4.9591e+00, -2.2133e+00, -1.4369e+00,  1.2706e-01,
         -4.5493e+00, -2.8335e+00, -2.8260e+00, -3.2546e+00, -9.1848e+00,
         -4.6841e+00, -5.3648e+00, -4.3216e+00, -3.1212e+00, -2.8049e+00,
          9.0830e-03, -4.8716e+00, -3.2995e+00, -7.1756e+00, -1.4882e+00]],
       grad_fn=<AddmmBackward0>)
[[-4.9255738e+00 -5.7584176e+00 -5.0294933e+00 -3.4966037e+00
  -7.4957557e+00 -4.9172301e+00 -2.6804578e+00 -3.0473750e+00
  -4.1944222e+00 -4.8382230e+00  1.1892551e-01 -5.0804191e+00
  -7.2426486e+00 -3.9917459e+00 -5.5467815e+00 -4.0908670e+00
  -4.9751635e+00 -3.5909424e+00 -5.4830050e+00 -4.3439617e+00
   3.8655975e-01 -4.9591417e+00 -2.2133341e+00 -1.4368612e+00
 

In [19]:
# АНАЛИЗ ВИДЕО

# Этот код выполняет следующие действия:
# 1. Использует библиотеку MediaPipe для обнаружения лиц на видео ("test_face3.mp4").
# 2. Для каждого обнаруженного лица, он извлекает изображение лица и предобрабатывает его (с помощью функции cv2_preprocess),
#       что включает в себя изменение размера изображения, конвертацию цвета, нормализацию и изменение формы массива.
# 3. Затем он производит инференцию с помощью модели ONNX на предварительно обработанном лице.
# 4. Результат инференции преобразуется в массив булевых значений и индексируется с помощью массива атрибутов (list_attr_en),
#       чтобы получить список атрибутов, которые соответствуют лицу.
# 5. Вернувшись к исходному изображению видео, на нем рисуются прямоугольники вокруг обнаруженных лиц
#   и отображается список атрибутов, обнаруженных для каждого лица.
# 6. Процесс повторяется для каждого кадра в видео, и каждый измененный кадр записывается в новое видеофайл - 'output3.avi'.

import onnxruntime
import time
import numpy as np
import cv2
import mediapipe as mp

mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
cap = cv2.VideoCapture("test_face3.mp4")
ort_session = onnxruntime.InferenceSession("cpu.onnx")
list_attr_en = np.array(
    [
        "5_o_Clock_Shadow",
        "Arched_Eyebrows",
        "Attractive",
        "Bags_Under_Eyes",
        "Bald",
        "Bangs",
        "Big_Lips",
        "Big_Nose",
        "Black_Hair",
        "Blond_Hair",
        "Blurry",
        "Brown_Hair",
        "Bushy_Eyebrows",
        "Chubby",
        "Double_Chin",
        "Eyeglasses",
        "Goatee",
        "Gray_Hair",
        "Heavy_Makeup",
        "High_Cheekbones",
        "Male",
        "Mouth_Slightly_Open",
        "Mustache",
        "Narrow_Eyes",
        "No_Beard",
        "Oval_Face",
        "Pale_Skin",
        "Pointy_Nose",
        "Receding_Hairline",
        "Rosy_Cheeks",
        "Sideburns",
        "Smiling",
        "Straight_Hair",
        "Wavy_Hair",
        "Wearing_Earrings",
        "Wearing_Hat",
        "Wearing_Lipstick",
        "Wearing_Necklace",
        "Wearing_Necktie",
        "Young",
    ]
)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
out = cv2.VideoWriter(
    "output3.avi", cv2.VideoWriter_fourcc(*"MJPG"), fps, (width, height)
)


# обработка изображения
def cv2_preprocess(img):
    img = cv2.resize(img, (128, 128), interpolation=cv2.INTER_NEAREST)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    mean = [0.5, 0.5, 0.5]
    std = [0.5, 0.5, 0.5]
    img = (img / 255.0 - mean) / std
    img = img.transpose((2, 0, 1))
    img = np.expand_dims(img, axis=0)
    img = np.ascontiguousarray(img, dtype=np.float32)
    return img


# Сигмоидная функция преобразует каждое входное значение в число в диапазоне от 0 до 1
def sigmoid_array(x):
    return 1 / (1 + np.exp(-x))


# список атрибутов
def result_inference(input_array):
    ort_inputs = {ort_session.get_inputs()[0].name: input_array}
    ort_outs = ort_session.run(None, ort_inputs)
    possibility = sigmoid_array(ort_outs[0]) > 0.5
    result = list_attr_en[possibility[0]]
    return result


with mp_face_detection.FaceDetection(
    model_selection=1, min_detection_confidence=0.5
) as face_detection:
    while cap.isOpened():
        a1 = time.time()
        success, image = cap.read()
        if not success:
            print("Ignoring empty camera frame.")
            break
        image.flags.writeable = False  # предотвратить случайные изменения исходного изображения во время обработки
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = face_detection.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        image2 = image.copy()
        if results.detections:
            for detection in results.detections:
                mp_drawing.draw_detection(image, detection)
                image_rows, image_cols, _ = image.shape
                location = detection.location_data.relative_bounding_box
                start_point = mp_drawing._normalized_to_pixel_coordinates(
                    location.xmin, location.ymin, image_cols, image_rows
                )
                end_point = mp_drawing._normalized_to_pixel_coordinates(
                    location.xmin + location.width,
                    location.ymin + location.height,
                    image_cols,
                    image_rows,
                )
                x1, y1 = start_point
                x2, y2 = end_point
                img_infer = image2[
                    y1 - 70 : y2, x1 - 50 : x2 + 50
                ].copy()  # область интереса
                img_infer = cv2_preprocess(img_infer)
                result = result_inference(img_infer)
                for i in range(0, len(result)):
                    image = cv2.putText(
                        image,
                        result[i],
                        (x1, y1 + i * 40),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        1,
                        (255, 255, 255),
                        1,
                        cv2.LINE_AA,
                    )
            a2 = time.time()
        out.write(image)

Ignoring empty camera frame.


In [20]:
# АНАЛИЗ ИЗОБРАЖЕНИЯ
def process_image(image_path):
    image = cv2.imread(image_path)
    with mp_face_detection.FaceDetection(
        model_selection=1, min_detection_confidence=0.5
    ) as face_detection:
        image.flags.writeable = False
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = face_detection.process(image)
        if results is not None:
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            image2 = image.copy()
            if results.detections:
                for detection in results.detections:
                    mp_drawing.draw_detection(image, detection)
                    image_rows, image_cols, _ = image.shape
                    location = detection.location_data.relative_bounding_box
                    start_point = mp_drawing._normalized_to_pixel_coordinates(
                        location.xmin, location.ymin, image_cols, image_rows
                    )
                    end_point = mp_drawing._normalized_to_pixel_coordinates(
                        location.xmin + location.width,
                        location.ymin + location.height,
                        image_cols,
                        image_rows,
                    )
                    x1, y1 = start_point
                    x2, y2 = end_point
                    img_infer = image2[y1 - 70 : y2, x1 - 50 : x2 + 50].copy()
                    img_infer = cv2_preprocess(img_infer)
                    result = result_inference(img_infer)
                    for i in range(0, len(result)):
                        image = cv2.putText(
                            image,
                            result[i],
                            (x1, y1 + i * 40),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            1,
                            (255, 255, 255),
                            1,
                            cv2.LINE_AA,
                        )
                cv2.imwrite("output.jpg", image)


process_image("test_img.jpg")

In [20]:
# АНАЛИЗ С ВЕБКАМЕРЫ В РЕЖИМЕ РЕАЛЬНОГО ВРЕМЕНИ

import time
import numpy as np
import cv2
import mediapipe as mp

cap = cv2.VideoCapture(0)

with mp_face_detection.FaceDetection(
    model_selection=1, min_detection_confidence=0.5
) as face_detection:
    while cap.isOpened():
        a1 = time.time()
        success, image = cap.read()
        if not success:
            print("Ignoring empty camera frame.")
            break
        image.flags.writeable = False
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = face_detection.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        image2 = image.copy()
        if results.detections:
            for detection in results.detections:
                mp_drawing.draw_detection(image, detection)
                image_rows, image_cols, _ = image.shape
                location = detection.location_data.relative_bounding_box
                start_point = mp_drawing._normalized_to_pixel_coordinates(
                    location.xmin, location.ymin, image_cols, image_rows
                )
                end_point = mp_drawing._normalized_to_pixel_coordinates(
                    location.xmin + location.width,
                    location.ymin + location.height,
                    image_cols,
                    image_rows,
                )

                if end_point is not None:
                    x2, y2 = end_point
                    x1, y1 = start_point[0], start_point[1]

                    # Добавляем проверку на размер изображения
                    y1_new = max(0, y1 - 70)
                    y2_new = min(image.shape[0], y2)
                    x1_new = max(0, x1 - 50)
                    x2_new = min(image.shape[1], x2 + 50)

                    img_infer = image2[y1_new:y2_new, x1_new:x2_new].copy()

                    # Проверяем, что изображение не пустое
                    if img_infer.size != 0:
                        img_infer = cv2_preprocess(img_infer)
                        result = result_inference(img_infer)
                        for i in range(0, len(result)):
                            image = cv2.putText(
                                image,
                                result[i],
                                (x1, y1 + i * 40),
                                cv2.FONT_HERSHEY_SIMPLEX,
                                1,
                                (255, 255, 255),
                                1,
                                cv2.LINE_AA,
                            )

        cv2.imshow("Frame", image)
        a2 = time.time()

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
cap.release()
cv2.imshow("Frame", image)