# 1. Импорт библиотек и настройка окружения

In [None]:
import os

input_dir = '/kaggle/input'
file_count = 0

for root, dirs, files in os.walk(input_dir):
    for file in files:
        if file_count < 5:
            print(os.path.join(root, file))
            file_count += 1
        else:
            break
    if file_count >= 5:
        break

In [None]:
!pip install dlib --no-cache-dir --force-reinstall --no-build-isolation

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import cv2
import imageio
import os
import glob
import string
from typing import List
import dlib

In [None]:
print(dlib.__version__)

## Распознавание ориентиров лица


In [None]:
!wget https://github.com/italojs/facial-landmarks-recognition/raw/master/shape_predictor_68_face_landmarks.dat

In [None]:
# Инициализация детектора лиц dlib и предиктора ориентиров. Индексы ориентиров рта (согласно 68-точечной модели dlib)

detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat")

MOUTH_POINTS = list(range(48, 61))

In [None]:
# Обрезает область губ из кадра видео, используя ключевые точки от dlib
def clip_mouth(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = detector(gray)

    if len(faces) == 0:
        return None

    face = faces[0]
    landmarks = predictor(gray, face)

    mouth_points = []
    for i in MOUTH_POINTS:
        x = landmarks.part(i).x
        y = landmarks.part(i).y
        mouth_points.append((x, y))

    x_coords = [pt[0] for pt in mouth_points]
    y_coords = [pt[1] for pt in mouth_points]
    min_x = min(x_coords)
    max_x = max(x_coords)
    min_y = min(y_coords)
    max_y = max(y_coords)

    padding = 30
    min_x = max(0, min_x - padding)
    max_x = min(frame.shape[1], max_x + padding)
    min_y = max(0, min_y - padding)
    max_y = min(frame.shape[0], max_y + padding)

    mouth_region = frame[min_y:max_y, min_x:max_x]

    return mouth_region

In [None]:
# Загрузка видео и предобработка
def load_video(path: str) -> List[float]:
    cap = cv2.VideoCapture(path)
    frames = []

    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
        ret, frame = cap.read()
        if not ret:
            break

        # Обрезка области рта из видео
        mouth_region = clip_mouth(frame)
        if mouth_region is not None:
            # Изменение размеров обрезанной области рта до фиксированного размера, например, 128x64
            mouth_region = cv2.resize(mouth_region, (140, 46), interpolation=cv2.INTER_AREA)
            mouth_region = cv2.cvtColor(mouth_region, cv2.COLOR_BGR2GRAY) 
            mouth_region = tf.expand_dims(mouth_region, axis=-1)  
            frames.append(mouth_region)

    cap.release()
    frames_tensor = tf.stack(frames)  

    # Нормализовать кадры
    mean = tf.reduce_mean(frames_tensor)
    std = tf.math.reduce_std(tf.cast(frames_tensor, tf.float32))

    return tf.cast((frames_tensor - mean), tf.float32) / std

# 2. Функции загрузки данных

## Характеристики загрузки видео и предварительной обработки

* Шаг 1. Прочтение видеофайла

* Шаг 2. Обрезка кадров в приблизительном месте расположения губ

* Шаг 3. Нормализация кадров

In [None]:
frames = load_video("/kaggle/input/data/s1_processed/bbaf2n.mpg")
print(frames.shape)

In [None]:
frames.shape

Тепловая карта выделенной области рта из видеофрагмента

In [None]:
plt.imshow(frames[3])

## Работа с текстом (алфавит и кодирование)
Создание словарей для преобразования символов в числа и обратно.

Зачем нужно преобразование символов в числа и обратно?
В задачах обработки естественного языка (NLP) и, в частности, в распознавании речи или движений губ (как в данном коде), нейронные сети работают не с символами напрямую, а с их числовыми представлениями. Это связано с тем, что:


1. Нейросети оперируют числами
   * Математические операции (например, умножение матриц в слоях LSTM или свёртках) выполняются только над числовыми данными. Символы (буквы, знаки препинания) необходимо преобразовать в числа.
2. Унификация и стандартизация
   * Каждый символ алфавита (например, 'a', 'b', '?') получает уникальный числовой идентификатор.
   * Это позволяет избежать неоднозначностей (например, регистр букв или пробелы).
3. Работа с loss-функциями
   * Функция потерь (например, CTC Loss) вычисляет разницу между предсказанными и истинными последовательностями в числовом виде.

In [None]:
vocab = string.ascii_lowercase + "'?! "
vocab = list(vocab)

In [None]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)

In [None]:
char2num_dict = {c: char_to_num(c).numpy() for c in char_to_num.get_vocabulary()}
char2num_dict

In [None]:
num2char_dict = {char_to_num(c).numpy():c  for c in num_to_char.get_vocabulary()}
num2char_dict

In [None]:
print(char_to_num(["a", "b", "c"]), 
     num_to_char([1,2,3]))

## Спецификация загрузки выравниваний и предварительной обработки

* Шаг 1. Прочитать текстовый файл

* Шаг 2. Получить текст для каждого слова

* Шаг 3. Создать тензор чисел из таблицы поиска

### Загрузка разметки
Загружает временную разметку произнесённых слов и преобразует её в последовательность чисел.

In [None]:
def load_alignment(path : str):
    with open(path, "r") as f:
        lines = f.readlines()
        
    tokens = []
    
    for line in lines:
        start, end, text = line.split()
        if text!='sil':
            tokens.append(text)
            
    chars = list(" ".join(tokens))
    
    return char_to_num(chars)

In [None]:
num = load_alignment("/kaggle/input/data/s1_processed/align/bbaf2n.align")
print(num)
print(num_to_char(num))

## Комбинированная функция загрузки для одновременной загрузки видео и выравниваний

In [None]:
def load_data(video_path : str):
    video_id = video_path.numpy().decode('UTF-8').replace("\\", "/").split("/")[-1].split(".")[0]
    align_path = f"/kaggle/input/data/s1_processed/align/{video_id}.align"
    video_path = video_path.numpy().decode('UTF-8')
    video_data = load_video(video_path)
    char_num = load_alignment(align_path)
    return video_data, char_num

In [None]:
v,c = load_data(tf.constant("/kaggle/input/data/s1_processed/bbaf2n.mpg"))

In [None]:
v.shape, c.shape

In [None]:
def mappable_function(path: str):
    try:
        result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
    except Exception as e:
        print(f"Error in {path}: {e}")
        result = None
    return result

# 3. Функция загрузки данных

In [None]:
from sklearn.model_selection import train_test_split

#### После того, как все закончили и прошли процесс моделирования, обнаружил, что некоторые видеопути вызывали проблемы, поэтому решил их исключить.

In [None]:
# Список видео с проблемами для исключения
videos_with_issues = [
    '/kaggle/input/data/s1_processed/lgal8n.mpg',
    '/kaggle/input/data/s1_processed/bbaf4p.mpg',
    '/kaggle/input/data/s1_processed/swwp3s.mpg',
    '/kaggle/input/data/s1_processed/lwik9s.mpg',
    '/kaggle/input/data/s1_processed/pgwr6p.mpg'
]

# Получение списка всех видеофайлов .mpg в указанном каталоге
videos = glob.glob("/kaggle/input/data/s1_processed/*.mpg")

# Исключение видео с проблемами
filtered_videos = [video for video in videos if video not in videos_with_issues]

# Количество отфильтрованных видео
print("Number of videos found (excluding issues):", len(filtered_videos))

# 4. Разделение видео. Создание датасета

In [None]:
train, test = train_test_split(filtered_videos, test_size=0.2, random_state=42)

In [None]:
# Создаёт батчи из видео и их текстовых разметок с дополнением (padding) до одинаковых размеров.
data = tf.data.Dataset.from_tensor_slices(train)
data = data.shuffle(500)
data = data.map(mappable_function, num_parallel_calls=tf.data.AUTOTUNE)
data = data.padded_batch(8, padded_shapes=([75, 46, 140, 1], [40]))
data = data.prefetch(tf.data.AUTOTUNE).cache()

val = tf.data.Dataset.from_tensor_slices(test)
val = val.shuffle(500)
val = val.map(mappable_function, num_parallel_calls=tf.data.AUTOTUNE)
val = val.padded_batch(8, padded_shapes=([75,  46, 140, 1], [40]))
val = val.prefetch(tf.data.AUTOTUNE).cache()

In [None]:
it = data.as_numpy_iterator()

In [None]:
f, a = it.next()
f.shape, a.shape

# 5. Построение модели

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (Conv3D, Dense, LSTM, Bidirectional, Dropout, 
                                     MaxPool3D, Activation, Reshape, SpatialDropout3D, 
                                     BatchNormalization, TimeDistributed, Flatten, Input)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler, ModelCheckpoint
import tensorflow as tf
from tensorflow.keras import layers, models

#### **Фрагмент для подключения к Kaggle TPU**

In [None]:
# print("Tensorflow version " + tf.__version__)
# AUTO = tf.data.experimental.AUTOTUNE

# try:
#     tpu = tf.distribute.cluster_resolver.TPUClusterResolver() 
#     print('Running on TPU ', tpu.master())
# except ValueError:
#     tpu = None

# if tpu:
#     tf.config.experimental_connect_to_cluster(tpu)
#     tf.tpu.experimental.initialize_tpu_system(tpu)
#     strategy = tf.distribute.experimental.TPUStrategy(tpu)
# else:
#     strategy = tf.distribute.get_strategy() 

# print("REPLICAS: ", strategy.num_replicas_in_sync)

## Функция потерь CTC (Connectionist Temporal Classification)
CTC функция вычисляет потери CTC между предсказаниями и целями

In [None]:
def CTCLoss(y_true, y_pred):
    batch_size = tf.cast(tf.shape(y_true)[0], tf.int64)
    input_len = tf.cast(tf.shape(y_pred)[1], tf.int64)
    label_len = tf.cast(tf.shape(y_true)[1], tf.int64)

    input_len = input_len * tf.ones(shape=(batch_size, 1), dtype = tf.int64)
    label_len = label_len * tf.ones(shape=(batch_size, 1), dtype = tf.int64)
    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_len, label_len) 
    
    return loss

## Архитектура модели (3D-CNN + LSTM)

Слои:
  - 3D-CNN: Обрабатывает пространственно-временные данные (кадры видео).
  - LSTM: Анализирует временные зависимости в последовательности кадров.
  - Bidirectional: Улучшает качество за счёт анализа контекста в обоих направлениях.

In [None]:
model = Sequential()
model.add(Input(shape=(75, 46, 140, 1)))
model.add(Conv3D(128, kernel_size=3, padding='same', activation='relu'))
model.add(MaxPool3D(pool_size=(1,2,2)))

model.add(Conv3D(256, kernel_size=3, padding='same', activation='relu'))
model.add(MaxPool3D(pool_size=(1,2,2)))

model.add(Conv3D(64, kernel_size=3, padding='same', activation='relu'))
model.add(MaxPool3D(pool_size=(1,2,2)))
model.add(Reshape([75, 5440]))

model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(Dropout(0.5))

model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(Dropout(0.5))
model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(Dropout(0.5))

model.add(Dense(512, activation='relu', kernel_initializer='he_normal'))
model.add(Dense(512, activation='relu', kernel_initializer='he_normal'))
model.add(Dense(char_to_num.vocabulary_size()+1, activation='softmax', kernel_initializer='he_normal'))

model.summary()

model.compile(optimizer=Adam(0.0001), loss=CTCLoss)

In [None]:
model.compile(optimizer=Adam(0.0001), loss=CTCLoss)

In [None]:
def schedular(epoch, lr):
    if epoch<100:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [None]:
class ProduceExample(tf.keras.callbacks.Callback): 
    def __init__(self, dataset) -> None: 
        self.dataset = dataset
        self.it = self.dataset.as_numpy_iterator()

    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.it.next()
        
        if data[0].shape[0] < 8:
            self.it = self.dataset.as_numpy_iterator()
            data = self.it.next()
            
        yhat = model.predict(data[0], verbose=0)
        decoded = tf.keras.backend.ctc_decode(yhat, [75]*data[0].shape[0], greedy=True)[0][0].numpy()

        for x in range(min(5, len(decoded))):
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [None]:
callbacks = [
    ModelCheckpoint("model-226-plus-20.keras", monitor="val_loss", save_best_only=True, verbose=1),
    ModelCheckpoint("model-226-plus-20-loss.keras", monitor="loss", save_best_only=True, verbose=1),

    ModelCheckpoint("model-226-plus-20-weights.weights.h5", monitor="val_loss", save_best_only=True, verbose=1, save_weights_only=True),
    ModelCheckpoint("model-226-plus-20-loss-weights.weights.h5", monitor="loss", save_best_only=True, verbose=1, save_weights_only=True),

    ProduceExample(val),
    ProduceExample(data)
]

In [None]:
print(model.output_shape)

In [None]:
history=model.fit(data, epochs=100, validation_data=(val), callbacks=callbacks, verbose=1)

In [None]:
model.save("LipNe(18-10).keras")
model.save("LipNet1(18-10).h5")

In [None]:
callbacks[-1].on_epoch_end(10)

# После 100 эпох

In [None]:
import matplotlib.pyplot as plt

loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure(figsize=(8, 6))
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.title('Model Loss Over Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend(loc='upper right')
plt.grid(True)
plt.show()

# Часть прогнозирования

In [None]:
sample = load_data(tf.convert_to_tensor('/kaggle/input/data/s1_processed/bbal7s.mpg'))

In [None]:
sample[0].shape

In [None]:
sample[1]

In [None]:
print('~'*100, 'REAL TEXT')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

In [None]:
yhat = model.predict(tf.expand_dims(sample[0], axis=0))

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [None]:
print('~'*100, 'PREDICTIONS')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]