# 1. Подготовка


## 1.1 Установка зависимостей


#### WSL 2.0

In [None]:
!pip install tf-nightly[and-cuda] keras opencv-python matplotlib

#### Windows

In [None]:
!conda install -c conda-forge cudatoolkit=11.2 cudnn=8.1.0 matplotlib

In [None]:
!pip install "tensorflow<2.11" keras opencv-python

In [None]:
!python scripts/check_cuda.py

## 1.2 Импортирование стандартных зависимостей


In [84]:
import os
import uuid
import tarfile
import cv2
import numpy as np
from matplotlib import pyplot as plt

## 1.3 Импортирование зависимостей TensorFlow (Функциональный API)


In [40]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras.metrics import Precision, Recall

## 1.4 Установка роста потребления памяти GPU для предотвращения ошибо OOM.


In [41]:
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
    print(gpu)

PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')


## 1.5 Создание структуры директорий


#### Установка путей


In [42]:
POS_PATH = os.path.join("data", "positive")
NEG_PATH = os.path.join("data", "negative")
ANC_PATH = os.path.join("data", "anchor")

INPUT_IMG_DIR_PATH = os.path.join("app_data", "input_image")
VERIF_IMG_DIR_PATH = os.path.join("app_data", "verification_images")

INPUT_IMG_PATH = os.path.join(INPUT_IMG_DIR_PATH, "input_image.jpg")

DIRECTORIES = [POS_PATH, NEG_PATH, ANC_PATH, INPUT_IMG_DIR_PATH, VERIF_IMG_DIR_PATH]

#### Создание директорий


In [43]:
def create_directories(directories):
    for dir_path in directories:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
            print(f"Создана директория: {dir_path}")
        else:
            print(f"Директория {dir_path} уже существует")


create_directories(DIRECTORIES)

Директория data\positive уже существует
Директория data\negative уже существует
Директория data\anchor уже существует
Директория app_data\input_image уже существует
Директория app_data\verification_images уже существует


# 2 Сбор данных


Скачать все изображения в виде gzip-tar-файла https://vis-www.cs.umass.edu/lfw/#download


## 2.1 Распаковка набора данных


In [44]:
DATASET_NAME = "lfw.tgz"

def load_dataset(dataset_name):
    try:
        with tarfile.open(dataset_name, "r:gz") as tar:
            tar.extractall()
    except FileNotFoundError as e:
        print(e.strerror)
    os.remove(dataset_name) if os.path.exists(dataset_name) else print("Ошибка извлечения, папка набора данных не найдена")

load_dataset(DATASET_NAME)

No such file or directory
Ошибка извлечения, папка набора данных не найдена


## 2.2 Перемещение изображений в директорию data/negative


In [45]:
try:
    for directory in os.listdir(DATASET_NAME):
        for file in os.listdir(os.path.join(DATASET_NAME, directory)):
            ex_path = os.path.join(DATASET_NAME, directory, file)
            new_path = os.path.join(NEG_PATH, file)
            os.replace(ex_path, new_path)
except FileNotFoundError as e:
    print(e.strerror)

The system cannot find the path specified


In [46]:
try:
    os.rmdir(DATASET_NAME)
except OSError as e:
    print(e.strerror)

The system cannot find the file specified


## 2.3 Сбор позитивных и якорных изображений


#### Библиотека для создания уникальных названий изображений.


In [47]:
def write_an_image(key: str, path: str):
    if cv2.waitKey(1) & 0xFF == ord(key):
        # Создание уникального названия изображения
        imgname = os.path.join(path, f"{uuid.uuid1()}.jpg")
        # Запись образца
        cv2.imwrite(imgname, frame)
        print(f"Сохранено изображение {imgname}")

In [13]:
# Установка соединения с веб-камерой
cap = cv2.VideoCapture(0)

# # Отключение панели инструментов OpenCV
# cv2.namedWindow("Image Collection", cv2.WINDOW_GUI_NORMAL)

while cap.isOpened():
    ret, frame = cap.read()

    # Обрезание кадра до 250x250px
    frame = frame[115: 115 + 250, 195: 195 + 250, :]

    write_an_image("a", ANC_PATH)
    write_an_image("p", POS_PATH)
    write_an_image("n", NEG_PATH)

    cv2.imshow("Image Collection", frame)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

Сохранено изображение data\negative\9a00f102-b1eb-11ee-a3ac-00155d8903d8.jpg
Сохранено изображение data\negative\9a0f4ea4-b1eb-11ee-92f2-00155d8903d8.jpg
Сохранено изображение data\negative\9a20127e-b1eb-11ee-b452-00155d8903d8.jpg
Сохранено изображение data\negative\9b43616c-b1eb-11ee-822c-00155d8903d8.jpg
Сохранено изображение data\negative\9b51c183-b1eb-11ee-b6db-00155d8903d8.jpg
Сохранено изображение data\negative\9f70080f-b1eb-11ee-9d0b-00155d8903d8.jpg
Сохранено изображение data\negative\9f853def-b1eb-11ee-a4fa-00155d8903d8.jpg
Сохранено изображение data\negative\9ffe987b-b1eb-11ee-b1ce-00155d8903d8.jpg
Сохранено изображение data\negative\a00cb1d5-b1eb-11ee-b5fc-00155d8903d8.jpg
Сохранено изображение data\negative\a0271abb-b1eb-11ee-91b5-00155d8903d8.jpg
Сохранено изображение data\negative\a03557a2-b1eb-11ee-b696-00155d8903d8.jpg
Сохранено изображение data\negative\a03c7e0f-b1eb-11ee-b0f7-00155d8903d8.jpg
Сохранено изображение data\negative\a0590001-b1eb-11ee-b2e8-00155d8903d8.jpg

## 2.4 Дополнение данных


In [48]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def data_aug(img):
    datagen = ImageDataGenerator(
        brightness_range=[0.98, 1.02],
        zoom_range=[0.95, 1.05],
        rotation_range=10,
        horizontal_flip=True,
        vertical_flip=True,
    )
    data = [datagen.random_transform(img) for _ in range(5)]
    return data


In [26]:
def augment_and_save_images(directory):
    for file_name in os.listdir(os.path.join(directory)):
        img_path = os.path.join(directory, file_name)
        img = cv2.imread(img_path)
        augmented_images = data_aug(img)

        for image in augmented_images:
            cv2.imwrite(
                os.path.join(directory, "{}.jpg".format(uuid.uuid1())), image.numpy()
            )

augment_and_save_images(ANC_PATH)
augment_and_save_images(POS_PATH)

# 3 Загрузка и предобработка изображений


## 3.1 Получение каталогов изображений


In [49]:
anchor = tf.data.Dataset.list_files(os.path.join(ANC_PATH, "*.jpg")).take(2473)
positive = tf.data.Dataset.list_files(os.path.join(POS_PATH, "*.jpg")).take(13285)
negative = tf.data.Dataset.list_files(os.path.join(NEG_PATH, "*.jpg")).take(2473)

## 3.2 Предобработка - масштабирование и изменение размера


In [50]:
def preprocess_image(file_path):
    # Чтение изображения
    byte_img = tf.io.read_file(file_path)
    # Загрузка изображения
    img = tf.io.decode_jpeg(byte_img)
    # Изменение размера изображения на 105x105
    img = tf.image.resize(img, (105, 105))
    # Масштабирование изображения в диапазоне от 0 до 1
    img /= 255.0

    return img

## 3.3 Создание помеченного набора данных


In [51]:
positives = tf.data.Dataset.zip(
    (anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor))))
)

negatives = tf.data.Dataset.zip(
    (anchor, negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(anchor))))
)

data = positives.concatenate(negatives)

## 3.4 Сборка тренировочных и тестовых данных


In [52]:
def preprocess_twin(input_img, validation_img, label):
    return preprocess_image(input_img), preprocess_image(validation_img), label

#### Создание загрузчика данных


In [53]:
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size=10000)

#### Данные для обучения


In [54]:
train_data = data.take(round(len(data) * 0.8))
train_data = train_data.batch(16).prefetch(8)

#### Данные для тестирования


In [55]:
test_data = data.skip(round(len(data) * 0.8))
test_data = test_data.take(round(len(data) * 0.2))
test_data = test_data.batch(16).prefetch(8)

# 4. Разработка модели


In [80]:
# Сиамский класс дистанции L1
class L1Dist(Layer):
    def __init__(self, **kwargs):
        super(L1Dist, self).__init__(**kwargs)

    def call(self, inputs):
        x, y = inputs
        return tf.abs(x - y)

In [81]:
def build_siamese_model():
    input_shape = (105, 105, 3)
    
    # Shared base network
    input_image = Input(shape=input_shape)
    x = Conv2D(64, (10, 10), activation="relu")(input_image)
    x = MaxPooling2D((2, 2), padding="same")(x)
    x = Conv2D(128, (7, 7), activation="relu")(x)
    x = MaxPooling2D((2, 2), padding="same")(x)
    x = Conv2D(128, (4, 4), activation="relu")(x)
    x = MaxPooling2D((2, 2), padding="same")(x)
    x = Conv2D(256, (4, 4), activation="relu")(x)
    x = Flatten()(x)
    x = Dense(4096, activation="relu")(x)
    
    base_network = Model(inputs=input_image, outputs=x)

    # Inputs
    input_image = Input(shape=input_shape, name="input_img")
    validation_image = Input(shape=input_shape, name="validation_img")

    # Process each image with the base network
    input_embedding = base_network(input_image)
    validation_embedding = base_network(validation_image)

    # Calculate L1 distance
    l1_distance = L1Dist()([input_embedding, validation_embedding])

    # Classification layer
    classifier = Dense(1, activation='sigmoid')(l1_distance)

    # Create the model
    model = Model(inputs=[input_image, validation_image], outputs=classifier, name="SiameseNetwork")

    return model

In [82]:
siamese_model = build_siamese_model()

ResourceExhaustedError: {{function_node __wrapped__Mul_device_/job:localhost/replica:0/task:0/device:GPU:0}} failed to allocate memory [Op:Mul]

In [None]:
siamese_model.summary()

# 5. Обучение


## 5.1 Настройка потерь и оптимизатора


In [63]:
binary_cross_loss = tf.keras.losses.BinaryCrossentropy()
opt = tf.keras.optimizers.Adam(learning_rate=1e-5)

## 5.2 Установка контрольных точек


In [64]:
checkpoint_dir = "./training_checkpoints"
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(opt=opt, model=siamese_model)

## 5.3 Построение функции шага обучения


In [65]:
@tf.function
def train_step(siamese_model, batch, optimizer, loss_fn):
    with tf.GradientTape() as tape:
        x = batch[:2]
        y = batch[2]
        yhat = siamese_model(x, training=True)
        loss = loss_fn(y, yhat)
    grads = tape.gradient(loss, siamese_model.trainable_variables)
    optimizer.apply_gradients(zip(grads, siamese_model.trainable_variables))
    return loss

## 5.4 Построение цикла обучения


In [66]:
def train(siamese_model, train_data, epochs, optimizer, loss_fn, checkpoint, checkpoint_prefix):
    for epoch in range(1, epochs + 1):
        print(f"\nЭпоха {epoch}/{epochs}")
        progbar = tf.keras.utils.Progbar(len(train_data))

        # Create metrics objects
        r = Recall()
        p = Precision()

        for idx, batch in enumerate(train_data):
            loss = train_step(siamese_model, batch, optimizer, loss_fn)
            yhat = siamese_model.predict(batch[:2])
            r.update_state(batch[2], yhat)
            p.update_state(batch[2], yhat)
            progbar.update(idx + 1)

        if epoch % 10 == 0:
            checkpoint.save(file_prefix=checkpoint_prefix)
            print(f"Loss: {loss.numpy()}, Recall: {r.result().numpy()}, Precision: {p.result().numpy()}")

## 5.5 Обучение модели


In [37]:
train(siamese_model, train_data, epochs=30, optimizer=opt, loss_fn=binary_cross_loss, checkpoint=checkpoint, checkpoint_prefix=checkpoint_prefix)


 Эпоха 1/20
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
Tensor("binary_crossentropy/weighted_loss/value:0", shape=(), dtype=float32)
0.0006796179 0.60978043 0.97603834

 Эпоха 2/20
0.0069738426 0.95357144 0.9857595

 Эпоха 3/20
0.00914889 0.9764647 0.9933775

 Эпоха 4/20
0.056053847 0.98778003 0.99538225

 Эпоха 5/20
8.5433385e-07 0.9904905 0.99547285

 Эпоха 6/20
0.2023139 0.9954774 0.99647886

 Эпоха 7/20
0.0005323805 0.99745286 0.99593085

 Эпоха 8/20
0.06585618 0.99899346 0.9984909

 Эпоха 9/20
0.0034698965 0.9994929 0.9989863

 Эпоха 10/20
2.549189e-05 0.99898475 0.9974658

 Эпоха 11/20
0.13732405 0.99949396 0.9984833

 Эпоха 12/20
1.4027254e-05 0.99746835 0.9964593

 Эпоха 13/20
9.656085e-06 0.9994921 0.9994921

 Эпоха 14/20
0.013354828 1.0 0.9994952

 Эпоха 15/20
0.00086251815 1.0 0.9994942

 Эпоха 16/20
0.0003014718 0.9984841 0.99898887

 Эпоха 17/20
0.09547681 1.0 0.99949265

 Эпоха 18/20
3.6756367e-06 1.0 1.0

 Эпоха 19/20
1.5099862e-06 1.0 1

# 6. Оценка модели


## 6.1 Прогнозы


In [36]:
test_input, test_val, y_true = test_data.as_numpy_iterator().next()

In [37]:
y_hat = siamese_model.predict([test_input, test_val])
model_predictions = [1 if prediction > 0.5 else 0 for prediction in y_hat]
print(model_predictions)



In [38]:
correct_answers = y_true
print(correct_answers)

[0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1]

In [39]:
if np.array_equal(model_predictions, correct_answers):
    print("Прогнозы модели сходятся с верными ответами")

array([0., 0., 1., 1., 1., 1., 0., 0., 1., 1., 0., 0., 0., 1., 0., 1.],
      dtype=float32)

# 7. Сохранение модели


In [74]:
MODEL_NAME: str = "siamese_model.keras"

In [54]:
siamese_model.compile(optimizer='adam', loss=binary_cross_loss, metrics=['accuracy'])
siamese_model.save(MODEL_NAME)

# 8 Загрузка модели


In [83]:
siamese_model = tf.keras.models.load_model(MODEL_NAME, custom_objects={"L1Dist": L1Dist})

ResourceExhaustedError: {{function_node __wrapped__StatelessRandomUniformV2_device_/job:localhost/replica:0/task:0/device:GPU:0}} OOM when allocating tensor with shape[7,7,64,128] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc [Op:StatelessRandomUniformV2]

#### Оценка прогнозов загруженной модели


In [33]:
siamese_model.summary()

Model: "SiameseNetwork"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_img (InputLayer)         [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 validation_img (InputLayer)    [(None, 105, 105, 3  0           []                               
                                )]                                                                
                                                                                                  
 model (Functional)             (None, 4096)         38960448    ['input_img[0][0]',              
                                                                  'validation_img[0][

# 9 Проверка работы модели в реальном времени


## 9.1 Функция подтверждения


In [79]:
def verify(model, detection_threshold, verification_threshold):
    # Preprocess input image
    input_img = preprocess_image(INPUT_IMG_PATH)

    # Preprocess all verification images
    verification_images = []
    for image in os.listdir(VERIF_IMG_DIR_PATH):
        verification_images.append(preprocess_image(os.path.join(VERIF_IMG_DIR_PATH, image)))
    verification_images = np.array(verification_images)

    # Predict all verification images
    batch_input_imgs = np.expand_dims(np.repeat(input_img, len(verification_images), axis=0), axis=1)
    results = model.predict([batch_input_imgs, verification_images])

    # Calculate detection and verification
    detection = np.sum(results > detection_threshold)
    num_verification_images = len(verification_images)
    verification = detection / num_verification_images
    verified = verification > verification_threshold

    return results, verified


## 9.2 Распознавание лица в реальном времени с OpenCV


In [90]:
cap = cv2.VideoCapture(0)

# Отключение панели инструментов OpenCV
cv2.namedWindow("Verification", cv2.WINDOW_GUI_NORMAL)

while cap.isOpened():
    ret, frame = cap.read()
    cv2.imshow("Verification", frame)
    frame = frame[120: 120 + 250, 200: 200 + 250, :]

    # Триггер проверки
    key = cv2.waitKey(1) & 0xFF
    if key == ord("v"):
        
        # Сохранение входного изображения в директорию app_data/input_image
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
        h, s, v = cv2.split(hsv)

        lim = 255 - 10
        v[v > lim] = 255
        v[v <= lim] -= 10

        final_hsv = cv2.merge((h, s, v))
        img = cv2.cvtColor(final_hsv, cv2.COLOR_HSV2BGR)

        cv2.imwrite(INPUT_IMG_PATH, cut_frame)
        # Подтверждение
        results, verified = verify(siamese_model, 0.99, 0.98)
        print(results)
        print(verified)

    if key == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()

[array([[0.9999999]], dtype=float32), array([[1.]], dtype=float32), array([[1.]], dtype=float32), array([[0.99614245]], dtype=float32), array([[0.99999976]], dtype=float32), array([[0.9992268]], dtype=float32), array([[0.99989974]], dtype=float32), array([[0.999931]], dtype=float32), array([[0.9999838]], dtype=float32), array([[0.99991]], dtype=float32), array([[0.9999982]], dtype=float32), array([[0.99892163]], dtype=float32), array([[1.]], dtype=float32), array([[0.9999882]], dtype=float32), array([[0.99998105]], dtype=float32), array([[0.99999976]], dtype=float32), array([[0.99998784]], dtype=float32), array([[1.]], dtype=float32), array([[0.99798113]], dtype=float32), array([[0.9999479]], dtype=float32)]
True
[array([[1.]], dtype=float32), array([[1.]], dtype=float32), array([[1.]], dtype=float32), array([[0.999316]], dtype=float32), array([[1.]], dtype=float32), array([[0.99999785]], dtype=float32), array([[0.99999976]], dtype=float32), array([[0.9999999]], dtype=float32), array([

KeyboardInterrupt: 

In [86]:
r = np.sum(np.squeeze(results) > 0.999999)

In [87]:
r

8

In [79]:
results

[array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.9948756]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.9999646]], dtype=float32),
 array([[0.99999344]], dtype=float32),
 array([[0.99999666]], dtype=float32),
 array([[0.9999989]], dtype=float32),
 array([[0.9999974]], dtype=float32),
 array([[0.9999989]], dtype=float32),
 array([[0.9999231]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.99999964]], dtype=float32),
 array([[0.9999989]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.9999989]], dtype=float32),
 array([[1.]], dtype=float32),
 array([[0.9919218]], dtype=float32),
 array([[0.99994063]], dtype=float32)]