<a href="https://colab.research.google.com/github/Davottakvota/PredictFrameRNN/blob/main/RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1. Монтируем Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Параметры
BATCH_SIZE = 32
EPOCHS = 10
MAX_FRAMES = 10  # Максимальная длина последовательности
MIN_FRAMES = 4

HEIGHT = 64
WIDTH = 64
CHANNELS = 1

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split



# Функция для создания последовательностей из видео
def create_sequences_from_video(video, min_frames=MIN_FRAMES, max_frames=MAX_FRAMES):
    """Генератор, создающий последовательности из одного видео"""
    n_frames = video.shape[0]
    for start_idx in range(min_frames-1, n_frames - 1):
        seq_length = np.random.randint(min_frames, min(max_frames, start_idx + 1) + 1)
        end_idx = start_idx + seq_length
        if end_idx >= n_frames:
            continue

        input_seq = video[start_idx:end_idx]
        target_frame = video[end_idx]

        # Дополняем последовательность нулями
        padded_seq = np.zeros((max_frames, 64, 64, 1), dtype=np.float32)
        padded_seq[:len(input_seq)] = input_seq

        yield padded_seq, target_frame

# Функция-генератор для всего датасета
def dataset_generator(split='test', indices=None):
    ds = tfds.load('moving_mnist', split=split)
    for i, example in enumerate(tfds.as_numpy(ds)):
        # Если указаны индексы и текущий индекс не входит в список - пропускаем
        if indices is not None and i not in indices:
            continue

        video = example['image_sequence'] / 255.0
        for seq, target in create_sequences_from_video(video):
            yield seq, target

# Создаем полный набор индексов
ds = tfds.load('moving_mnist', split='test')
total_videos = ds.cardinality().numpy()
all_indices = list(range(total_videos))

In [None]:
%%bash
dirs=(
    "/content/drive/My Drive/checkweightsRNN"
)

for dir in "${dirs[@]}"; do
    if [ ! -d "$dir" ]; then
        mkdir -p "$dir"
        echo "✅ Создана папка: $dir"
    else
        echo "ℹ️ Папка уже существует: $dir"
    fi
done

In [None]:
# Разделяем индексы вместо данных
train_indices, test_indices = train_test_split(
    all_indices,
    test_size=0.2,
    random_state=42
)

In [None]:
# Создаем tf.data.Dataset из генераторов
def create_dataset(indices, batch_size=BATCH_SIZE):
    return tf.data.Dataset.from_generator(
        lambda: dataset_generator('test', indices),
        output_signature=(
            tf.TensorSpec(shape=(MAX_FRAMES, 64, 64, 1)),
            tf.TensorSpec(shape=(64, 64, 1)))

    ).batch(batch_size).prefetch(tf.data.AUTOTUNE)

# Создаем тренировочный и тестовый наборы данных
train_dataset = create_dataset(train_indices)
test_dataset = create_dataset(test_indices)

# Оптимизированная архитектура модели без TimeDistributed и маскирования
def build_moving_mnist_model():
    inputs = layers.Input(shape=(MAX_FRAMES, 64, 64, 1))

    # Убираем маскирующий слой
    # Вместо этого будем использовать нулевое дополнение

    # Первый ConvLSTM слой
    x = layers.ConvLSTM2D(
        filters=32,
        kernel_size=(3, 3),
        padding='same',
        return_sequences=True
    )(inputs)
    x = layers.BatchNormalization()(x)

    # Второй ConvLSTM слой
    x = layers.ConvLSTM2D(
        filters=64,
        kernel_size=(3, 3),
        padding='same',
        return_sequences=False
    )(x)
    x = layers.BatchNormalization()(x)

    # Декодер
    x = layers.Conv2D(64, 3, activation='leaky_relu', padding='same')(x)
    x = layers.BatchNormalization()(x)

    outputs = layers.Conv2D(
        filters=1,
        kernel_size=3,
        padding='same',
        activation='sigmoid'
    )(x)

    model = models.Model(inputs, outputs)

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss='binary_crossentropy',
        metrics=['mae']
    )

    return model

# Создаем и обучаем модель
model = build_moving_mnist_model()
model.summary()

# Callback'и
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=2)
]


In [None]:
# Обучение с уменьшенными параметрами
history = model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=test_dataset,
    callbacks=callbacks,
    steps_per_epoch=300,       # Уменьшаем количество шагов
    validation_steps=100        # Уменьшаем валидационные шаги
)

In [None]:
def plot_training_history(history):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss Evolution')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['mae'], label='Training MAE')
    plt.plot(history.history['val_mae'], label='Validation MAE')
    plt.title('MAE Evolution')
    plt.legend()

    plt.tight_layout()
    plt.show()

# После обучения
plot_training_history(history)

In [None]:
def generate_video_from_sequence(model, initial_frames, total_frames=20):
    """
    Генерирует видео, начиная с заданной последовательности кадров

    Параметры:
    model: обученная модель предсказания кадров
    initial_frames: список начальных кадров (numpy array, shape: [h, w, c])
    total_frames: общее количество кадров в итоговом видео

    Возвращает:
    numpy array всех кадров формы (total_frames, h, w, c)
    """
    # Создаем копию начальных кадров
    sequence = initial_frames.copy()
    num_initial = len(initial_frames)

    # Проверка минимальной длины
    if num_initial < 1:
        raise ValueError("Должен быть хотя бы один начальный кадр")

    # Создаем буфер для входа модели
    current_sequence = np.zeros((1, MAX_FRAMES, HEIGHT, WIDTH, CHANNELS), dtype=np.float32)

    # Рассчитываем сколько кадров нужно сгенерировать
    frames_to_generate = total_frames - num_initial

    # Начальное заполнение буфера
    start_idx = max(0, num_initial - MAX_FRAMES)
    last_frames = sequence[start_idx:num_initial]
    current_sequence[0, -len(last_frames):] = last_frames

    # Генерация новых кадров
    for i in range(frames_to_generate):
        # Предсказываем следующий кадр
        next_frame = model.predict(current_sequence, verbose=0)[0]
        sequence.append(next_frame)

        # Обновляем буфер последовательности
        if MAX_FRAMES > 1:
            # Сдвигаем последовательность влево
            current_sequence[0, :-1] = current_sequence[0, 1:]
            # Добавляем новый кадр в конец
            current_sequence[0, -1] = next_frame

    return np.array(sequence)

# Загрузка одного видео для тестирования
def load_single_video(index=0):
    ds = tfds.load('moving_mnist', split='test')
    for i, example in enumerate(tfds.as_numpy(ds)):
        if i == index:
            return example['image_sequence'] / 255.0
    return None

# Визуализация результатов
def visualize_results(real_video, generated_video, num_frames=5):
    fig, axes = plt.subplots(2, num_frames, figsize=(15, 6))

    for i in range(num_frames):
        frame_idx = i * (len(real_video) // num_frames)

        # Реальные кадры
        axes[0, i].imshow(real_video[frame_idx, :, :, 0], cmap='gray')
        axes[0, i].set_title(f"Real Frame {frame_idx}")
        axes[0, i].axis('off')

        # Сгенерированные кадры
        axes[1, i].imshow(generated_video[frame_idx, :, :, 0], cmap='gray')
        axes[1, i].set_title(f"Generated Frame {frame_idx}")
        axes[1, i].axis('off')

    plt.tight_layout()
    plt.show()

In [None]:
# Тестирование на одном видео
test_video = load_single_video(0)
if test_video is not None:
    first_frames = test_video[:10]
    real_video = test_video[:20]

    generated_video = generate_video_from_sequence(model, list(first_frames), total_frames=20)
    visualize_results(real_video, generated_video)

# Сохранение модели в оптимизированном формате
model.save("/content/drive/My Drive/checkweightsRNN/video_generation_model.keras", save_format='keras')

In [None]:
generated_video.shape

In [None]:
from IPython.display import HTML
from IPython import display
import matplotlib.pyplot as plt
import matplotlib.animation as animation

fig, ax = plt.subplots()
def animate(i):
  ax.clear()
  return ax.imshow(generated_video[i, :, :, 0], cmap='gray')
ani = animation.FuncAnimation(fig=fig,
                              func=animate,
                              frames=generated_video[:, 0, 0, 0].shape[0],
                              interval=1000/4,
                              blit=False)
HTML(ani.to_jshtml())

In [None]:
# 2. Определяем путь к модели
model_path = '/content/drive/My Drive/checkweightsRNN/video_generation_model.keras'

# 3. Определяем пользовательские объекты (custom objects)
#    Если в модели использовались кастомные слои, функции потерь или метрики
# custom_objects = {
#     'ssim_loss': ssim_loss,  # Ваша кастомная функция потерь
#     'PSNR': PSNR,            # Ваша кастомная метрика
#     'SSIM': SSIM,            # Ваша кастомная метрика
#     'TemporalAttention': TemporalAttention  # Ваш кастомный слой
# }

# 4. Загружаем модель
import tensorflow as tf
loaded_model = tf.keras.models.load_model( model_path
    # model_path,
    # custom_objects=custom_objects
)

# 5. Проверяем, что модель загружена корректно
loaded_model.summary()