In [None]:
# Импортируем необходимые библиотеки
import os
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import cv2
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import nltk
from collections import Counter
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import random_split
from torch.cuda.amp import GradScaler, autocast

In [None]:
# Установим случайные начальные значения для воспроизводимости
torch.manual_seed(42)
np.random.seed(42)

In [None]:
# Проверим доступность GPU видеокарты
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Используемое устройство: {device}")

In [None]:
# 1. Предварительная обработка данных

class VideoDataset(Dataset):
    """Датасет для работы с видео и их описаниями"""
    
    def __init__(self, feature_dir, caption_file, vocab, max_frames=40):
        """Инициализация датасета
        
        Аргументы:
            feature_dir (str): Путь к директории с предвычисленными признаками
            caption_file (str): Файл с описаниями в формате "video_id описание"
            vocab (Vocabulary): Объект словаря для токенизации
            max_frames (int): Макс. количество кадров на видео
        """
        self.feature_dir = feature_dir
        self.max_frames = max_frames
        self.vocab = vocab
        
        # Загрузка и парсинг описаний
        self.captions = self._load_captions(caption_file)
        self.video_ids = list(self.captions.keys())
    
    def _load_captions(self, caption_file):
        """Загружает описания из файла"""
        captions = {}
        with open(caption_file, 'r', encoding='utf-8') as f:
            for line in f:
                parts = line.strip().split(' ', 1)
                if len(parts) == 2:
                    video_id, caption = parts
                    captions.setdefault(video_id, []).append(caption)
        return captions
    
    def __len__(self):
        return len(self.video_ids)
    
    def __getitem__(self, idx):
        """Получает один элемент датасета по индексу"""
        video_id = self.video_ids[idx]
        
        # Загрузка предвычисленных признаков
        features = self._load_features(video_id)
        
        # Выбор случайного описания и токенизация
        caption = self._process_caption(video_id)
        
        return features, caption
    
    def _load_features(self, video_id):
        """Загружает признаки видео из файла"""
        feature_path = os.path.join(self.feature_dir, f"{video_id}.npy")
        features = np.load(feature_path)
        
        # Проверяем и корректируем размерность
        if features.ndim > 2:            
            features = features.reshape(features.shape[0], -1) # Преобразуем к [seq_len, feature_dim]
        return torch.FloatTensor(features).to(device)
    
    def _process_caption(self, video_id):
        """Токенизирует и преобразует описание в тензор"""
        caption = np.random.choice(self.captions[video_id])
        tokens = nltk.tokenize.word_tokenize(caption.lower())
        caption = [self.vocab('<start>')] + [self.vocab(token) for token in tokens] + [self.vocab('<end>')]
        return torch.LongTensor(caption)

class Vocabulary:
    """Словарь для преобразования слов в индексы"""
    
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0
        self._add_special_tokens()
    
    def _add_special_tokens(self):
        """Добавляет специальные токены"""
        for token in ['<pad>', '<start>', '<end>', '<unk>']:
            self.add_word(token)
    
    def add_word(self, word):
        """Добавляет слово в словарь"""
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1
    
    def __call__(self, word):
        """Возвращает индекс слова или токен <unk>"""
        return self.word2idx.get(word, self.word2idx['<unk>'])
    
    def __len__(self):
        return len(self.word2idx)

def build_vocab(caption_file, threshold=3):
    """Строит словарь на основе файла с описаниями"""
    counter = Counter()
    
    # Подсчет частот слов
    with open(caption_file, 'r', encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split(' ', 1)
            if len(parts) == 2:
                counter.update(nltk.tokenize.word_tokenize(parts[1].lower()))
    
    # Фильтрация по порогу
    vocab = Vocabulary()
    for word, count in counter.items():
        if count >= threshold:
            vocab.add_word(word)
    
    return vocab

def precompute_features(video_dir, output_dir, batch_size=16):
    """Предварительно вычисляет признаки видео с помощью ResNet152"""
    
    os.makedirs(output_dir, exist_ok=True)
    
    # Инициализация модели для извлечения признаков
    model = models.resnet152(weights=models.ResNet152_Weights.IMAGENET1K_V1)
    feature_extractor = nn.Sequential(*list(model.children())[:-1]).to(device)
    feature_extractor.eval()
    
    # Трансформации для кадров
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                           std=[0.229, 0.224, 0.225])
    ])
    
    # Обработка видеофайлов
    video_files = [f for f in os.listdir(video_dir) if f.endswith(('.mp4', '.avi'))]
    
    for i in range(0, len(video_files), batch_size):
        batch_files = video_files[i:i + batch_size]
        
        for video_file in tqdm(batch_files, desc=f"Batch {i//batch_size + 1}"):
            
            video_id = os.path.splitext(video_file)[0]
            feature_path = os.path.join(output_dir, f"{video_id}.npy") # Проверяем, существуют ли уже признаки
            if os.path.exists(feature_path):
                continue  # Признаки уже есть, пропускаем

            video_path = os.path.join(video_dir, video_file)
            # Извлечение кадров
            frames = extract_frames(video_path)
            
            # Извлечение признаков
            features = []
            with torch.no_grad():
                for frame in frames:
                    if frame.ndim == 3:  # Проверка валидности кадра
                        frame = transform(frame).unsqueeze(0).to(device)
                        feature = feature_extractor(frame).squeeze().cpu().numpy()
                        features.append(feature)
            
            # Сохранение признаков
            features = np.stack(features, axis=0)
            np.save(os.path.join(output_dir, f"{video_id}.npy"), features)

def extract_frames(video_path, max_frames=40):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Could not open video: {video_path}")
    
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    sample_rate = max(1, frame_count // max_frames)
    frames = []
    
    for i in range(0, frame_count, sample_rate):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if ret:
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if len(frames) >= max_frames:
            break
    
    cap.release()
    
    # Если кадров меньше, чем нужно, дублируем последний кадр вместо нулей
    if len(frames) < max_frames and len(frames) > 0:
        last_frame = frames[-1]
        frames.extend([last_frame for _ in range(max_frames - len(frames))])
    elif len(frames) == 0:  # Если кадров нет вообще
        frames = [np.zeros((224, 224, 3), dtype=np.uint8) for _ in range(max_frames)]
    
    return frames

In [None]:
# 2. Извлечение признаков

class FeatureExtractor:
    """
    Класс для извлечения признаков из видеокадров с использованием предобученной CNN
    Основные функции:
    - Инициализация предобученной модели CNN (по умолчанию ResNet152)
    - Преобразование входных кадров к нужному формату
    - Извлечение признаков из каждого кадра
    """
    def __init__(self, cnn_model=None):
        if cnn_model is None:
            # Загрузка предобученной ResNet152
            cnn_model = models.resnet152(weights=models.ResNet152_Weights.IMAGENET1K_V1)
            # Удаляем последний классификационный слой
            self.model = nn.Sequential(*list(cnn_model.children())[:-1])
        else:
            self.model = cnn_model

        # Перенос модели на устройство (GPU/CPU) и перевод в режим оценки
        self.model = self.model.to(device)
        self.model.eval()
        
        # Определение преобразований для входных изображений
        self.transform = transforms.Compose([
            transforms.ToPILImage(),               # Конвертация в PIL Image
            transforms.Resize((224, 224)),         # Изменение размера под вход сети
            transforms.ToTensor(),                 # Конвертация в тензор
            transforms.Normalize(                  # Нормализация
                mean=[0.485, 0.456, 0.406],        # Средние значения ImageNet
                std=[0.229, 0.224, 0.225]          # Стандартные отклонения ImageNet
            )
        ])
    
    def extract_features(self, frames):
        """
        Извлекает признаки из списка кадров
        
        Аргументы:
            frames (list): Список кадров в формате numpy arrays
            
        Возвращает:
            torch.Tensor: Извлеченные признаки размерности [число_кадров, размерность_признака]
            
        Процесс работы:
        1. Применение преобразований к каждому кадру
        2. Извлечение признаков с помощью CNN
        3. Накопление и объединение признаков
        """
        features = []
        
        with torch.no_grad(): # Отключаем вычисление градиентов для ускорения
            for frame in frames:
                # Применяем преобразования и добавляем batch-размерность
                frame = self.transform(frame).unsqueeze(0).to(device)
                
                # Извлекаем признаки
                feature = self.model(frame)
                feature = feature.squeeze() # Удаляем лишние размерности
                
                features.append(feature.cpu()) # Переносим на CPU для экономии памяти
        
        return torch.stack(features) # Объединяем все признаки в один тензор

In [None]:
# 3. Архитектура модели

class Encoder(nn.Module):
    """
    Видео-энкодер для обработки признаков кадров с учетом временной информации
    Использует двунаправленный LSTM для анализа последовательности кадров
    
    Основные функции:
    - Обработка признаков отдельных кадров
    - Учет временных зависимостей между кадрами
    - Подготовка скрытых состояний для декодера
    """
    def __init__(self, feature_dim, hidden_dim, num_layers=1, dropout=0.5):
        """
        Инициализация энкодера
        
        Аргументы:
            feature_dim (int): Размерность входных признаков кадра
            hidden_dim (int): Размерность скрытого слоя LSTM
            num_layers (int): Количество слоев LSTM
            dropout (float): Вероятность дропаута
        """
        super(Encoder, self).__init__()
        
        self.feature_dim = feature_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # Двунаправленный LSTM для временного кодирования
        self.lstm = nn.LSTM(
            input_size=feature_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,          # Первая размерность - batch
            bidirectional=True,       # Двунаправленная архитектура
            dropout=dropout if num_layers > 1 else 0  # Дропаут только для многослойных LSTM
        )
        
    def forward(self, features):
        """
        Прямой проход через энкодер
        
        Аргументы:
            features (torch.Tensor): Признаки видеокадров размерности 
                                    [batch_size, seq_len, feature_dim]
            
        Возвращает:
            tuple: (outputs, hidden)
                - outputs: Выходы LSTM [batch_size, seq_len, hidden_dim*2]
                - hidden: Кортеж (скрытое состояние, состояние ячейки)
        """
         # Проверяем размерность входных данных
        if features.dim() > 3:
            batch_size, seq_len = features.size(0), features.size(1)
            # Преобразуем к [batch_size, seq_len, feature_dim]
            features = features.view(batch_size, seq_len, -1)
            
        # Убеждаемся, что последнее измерение имеет правильный размер
        if features.size(-1) != self.feature_dim:
            raise ValueError(f"Неверная размерность признаков: ожидается {self.feature_dim}, получено {features.size(-1)}")
        
        # Пропускаем признаки через LSTM
        outputs, hidden = self.lstm(features)
        
        return outputs, hidden