In [2]:
"""
  PyTorch ecosystem
  suits better than TensorFlow, easier to debug and make custom stuff
"""
import torch
import torch.nn as nn ## foundation of neural networks ->
import torch.optim as optim # optimizers

"""
  TorchVision
"""
import torchvision.datasets as datasets # ready-to-go dataset
import torchvision.transforms as transforms # preprocessing, normalization

from torch.utils.data import DataLoader # standart pipeline training

import numpy as np
import matplotlib.pyplot as plt

In [15]:
def load_dataset():
  """
  Moving MNIST data:
    1. batch_size -> количество видео в батче
    2. seq_len -> количество кадров в каждом видео
    3. image_size -> размер кадра (image_size x image_size)

  """
  # loading dataset
  mnist = datasets.MNIST(
      root='./dataset', # save folder dataset
      train=True,
      download=True, # if to download
      transform=transforms.ToTensor() # transform to tensors [0,1]
  )

  return mnist
mnist = load_dataset()

100%|██████████| 9.91M/9.91M [00:00<00:00, 16.1MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 488kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 4.41MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 3.71MB/s]


In [11]:
def moving_frames(batch_size=8, seq_len=5, image_size=64):

  sequences = [] # all video list

  for vid_idx in range(batch_size):

    """
    Step 1: Random number
    """
    digit_idx = digit_idx = np.random.randint(len(mnist)) # random index
    digit_img, _ = mnist[digit_idx]
    digit_np = digit_img.squeeze().numpy()

    """
    Step 2: Initial pos and speed
    """
    pos_x = np.random.randint(0,image_size - 28) # -> 0 to 36
    pos_y = np.random.randint(0,image_size - 28) # -> 0 to 36

    vel_x = np.random.randint(-2, 3) # -> -2 to 2
    vel_y = np.random.randint(-2, 3) # -> -2 to 2

    """
    Step 3: Our slideshow
    """
    video_frames = []
    for frame in range(seq_len+1):

      frame = np.zeros(image_size, image_size) # empty frame
      frame[pos_y:pos_y+28, pos_x:pos_x+28] = digit_np # init pos
      video_frames.append(frame)

      pos_x += vel_x
      pos_y += vel_y

      # Bounds
      if pos_x <= 0 or pos_x >= image_size - 28:
          vel_x = -vel_x
      if pos_y <= 0 or pos_y >= image_size - 28:
          vel_y = -vel_y

      # Pos correction
      pos_x = np.clip(pos_x, 0, image_size - 28)
      pos_y = np.clip(pos_y, 0, image_size - 28)

    sequences.append(video_frames)

  sequences = np.array(sequences) # преобразуем в numpy array -> удобнее для вычислений
  sequences = np.expand_dims(sequences, axis=2) # в позиции по индексу 2 добавляем новый слот для наших channels: ( batch time  h   w) ->  (batch time, ch,  h   w)

# input target separate
  input_seq = sequences[:, :-1] # первые seq_len кадров (0-4)
  target_seq = sequences[:, 1:] # последний кадр (5)

# преобразование в тенсоры
  input_seq = torch.FloatTensor(input_seq)
  target_seq = torch.FloatTensor(target_seq)

  return input_seq, target_seq


In [5]:
"""
nn.Module
Base class for all neural network modules.

Your models should also subclass this class.
"""
class CNNEncoder(nn.Module):

  """
  __init__ function:
  input_channels= color channels, input data is 1 color (our MNIST dataset)
  feature_dim = how many numbers to use to describe the frame
  """

  def __init__(self, input_channels=1, feature_dim=512):
    super(CNNEncoder, self).__init__() # nn.Module init

    """
    Convolutional layers

      Amount of channels:
      1 channel -> initial image
      64 channels -> 64 different "detectors" (edges, corners, textures)
      128 channels -> 128 advanced shapes

      Kernel size:
      1x1 -> precise operations (color)
      3x3 -> local patterns (edges, small features)
      5x5 -> wider patterns, don't need them

      nn.Conv1d -> audio, text
      nn.Conv2d -> 2d images
      nn.Conv3d -> video, 3d models

      nn.BatchNorm -> our pit-stop master, keeps our model fit and "even" to finish a race

      nn.Linear -> our final summarizing layer

    """
    self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=3, padding=1)
    self.bn1 = nn.BatchNorm2d(64)

    self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
    self.bn2 = nn.BatchNorm2d(128)

    self.fc = nn.Linear(128, feature_dim)

  """
  forward() pass -> the heart of neural networks
  """
  def forward(self, x):
    """
    Init input:
      batch_size
      1 -> because black-white images
      28x28 -> our MNIST dataset
      (batch_size, 1, 28, 28)
    """

    # first block
    x = self.conv1(x) # (batch, 1, 28, 28) → (batch, 64, 28, 28)
    x = self.bn1(x)
    x = torch.relu(x)

     # second block
    x = self.conv2(x) # (batch, 64, 28, 28) → (batch, 128, 28, 28)
    x = self.bn2(x)
    x = torch.relu(x)

    # final block
    x = self.fc(x) # (batch, 128, 28, 28) → (batch, 128)

    """
    Global Average Pooling:
      "суммирование" всей карты фичей
      усредняем все пиксели в одно число -> 28x28 = 784 convertion to 1x1 = 1
    (batch, 128, 28, 28) → (batch, 128, 1, 1)
    "HUMANIZATION, sort of"

    """
    x = torch.nn.functional.adaptive_avg_pool2d(x, (1, 1))

    """
    Flatten: (batch, 128, 1, 1) → (batch, 128)
    before flatten:
      ┌─────┐ ┌─────┐ ┌─────┐
      │ 0.23│ │ 0.67│ │ 0.31│ ...
      └─────┘ └─────┘ └─────┘

    after flatten:
    0.23  0.67  0.31  0.45  0.88  ...

    get ridding of extra tensors
    """
    x = x.view(x.size(0), -1)

    return x

In [9]:
class VideoPredictor(nn.Module):
    def __init__(self,
                 input_channels=1,
                 feature_dim=256,
                 hidden_dim=128 # our краткосрочная память, меньше cause obvious
                 ): ## nn.Module init

      # our encoder init
      self.cnn_encoder = CNNEncoder()

      """
        LSTM - Long short term memory:
          RNN model with memorization process, basically
        The way it works:
          1. Cell state -> long short memory
          2. Hidden state -> "right now" memory
          3. Gates:
            3.1. Forget gate -> old information not needed, delete
            3.2 Input gate -> new information is needed, remember
            3.3 Output gate -> "long story short" output
      """
      self.lstm = nn.LSTM(
          input_size=input_channels,
          hidden_size=hidden_dim,
          batch_first=True, # format
      )

      """
        Decoder:
        - a painter who draws our slides
        Steps:
        1. Layer 1 -> expand our size to 512 to add details
        2. Layer 2 -> add even more details
        3. Layer 3 -> final convertion to pixels

        ReLU (Rectified Linear Unit) - activation function:
        - положительные числа -> не меняются
        - отрицательные числа -> 0
        - лежит в positive y,x axis -> straight line

        Sigmoid - converting any number in a range of [0,1]:
        - лежит также в positive y,x axis
        - нужен так как пиксели должны быть в диапазоне [0,1]
      """

      self.decoder = nn.Sequential(
          nn.Linear(hidden_dim, 512), # Layer 1
          nn.ReLU(),
          nn.Linear(512, 1024), # Layer 2
          nn.ReLU(),
          nn.Linear(1024, 28*28*input_channels), # Layer 3
          nn.Sigmoid()
      )

      def forward(self, x):
        # shapes from x
        batch_size, seq_len, channels, height, width = x.shape
        # print(x.shape)

        # совершаем преобразование
        x_flat = x.view(
            batch_size * seq_len, # 5x8 = 40, instead of processing 8 videos we process 40 frames once
            channels,
            height,
            width
            )
        # print(x.flat)

        # cnn_encoder use we created
        features = self.cnn_encoder(x_flat) # (batch, 128)

        # doing roolback, 40 -> 8x5, because LSTM needs to know about structure
        features = features.view(batch_size, seq_len, -1) # (8, 5, feature_dim)

        """
        # LSTM analyzes patterns in each part of a video
        # Input: (8, 5, feature_dim) - 8 видео по 5 фичей каждое
        # Output: (8, 5, hidden_dim) - понимание движения для каждого шага
        """
        lstm_out, (_, _) = self.lstm(features)

        # LSTM is smarter than a Markov chain
        last_step = lstm_out[:, -1, :] # yet we take last step only for predictions, (8, 5, hidden_dim) → (8, hidden_dim)

        predict_flat = self.decoder(last_step) # (8, hidden_dim) → (8, 28*28)

        predict_frame = predict_flat.view( # final reshape
            batch_size,
            channels,
            height,
            width
        )

        return predict_frame

