# Question 3: LSTM

Following question 1 (2), train a sequence to sequence (e.g., 4 timesteps to 4 timesteps) predictive model (e.g., LSTM)  in the reduced space, and decode predicted results in the full space. Evaluate your algorithm performance on the test dataset using different metrics (e.g., MSE, RMSE, SSIM…).

Reference: https://towardsdatascience.com/multivariate-time-series-forecasting-with-deep-learning-3e7b3e2d2bcf

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

### Video sequence loading

In [None]:
# load the data as sequences
import os
import cv2 as cv
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

In [None]:
# custom dataset to process videos
class VideoDataset(Dataset):
  def __init__(self, folder_path, sequence_length=4, video_length=16, transform=None):
    self.folder_path = folder_path
    self.sequence_length = sequence_length
    self.video_length = video_length
    self.transform = transform
    self.video_list = os.listdir(folder_path)

  def __len__(self):
    return len(self.video_list) * ((self.video_length // self.sequence_length) - 1) # - 1 because we don't want to train on the final sequence - no target sequence!

  def __getitem__(self, idx):
    video_idx = idx // (self.sequence_length - 1)
    frame_idx = idx % (self.sequence_length - 1)
    video_path = os.path.join(self.folder_path, self.video_list[video_idx])
    frames = self._load_video(video_path)
    input_seq, target_seq = self._get_sequence_pair(frames, frame_idx)
    return torch.tensor(input_seq, dtype=torch.float32), torch.tensor(target_seq, dtype=torch.float32)

  def _load_video(self, video_path):
    frames = []
    cap = cv.VideoCapture(video_path)
    while True:
      ret, frame = cap.read()
      if not ret:
        break
      frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
      frames.append(frame.flatten())
    cap.release()

    frames = np.array(frames) # should be 16 x (128*128) i.e. 16 frames, each 128x128 frame flattened to 1d
    return frames

  def _get_sequence_pair(self, frames, frame_idx):
    input_seq = np.array(frames[frame_idx:frame_idx+self.sequence_length])
    target_seq = np.array(frames[frame_idx+self.sequence_length:frame_idx+2 * self.sequence_length])
    return input_seq, target_seq

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
])

batch_size = 1
sequence_length = 4
folder_path = "./VIDEOS/training/"

video_dataset = VideoDataset(folder_path, sequence_length=sequence_length, transform=transform)
train_loader = DataLoader(video_dataset, batch_size=batch_size, shuffle=True)

### Seq2Seq model definition

In [None]:
# Define the seq2seq model
class Seq2SeqModel(nn.Module):
  def __init__(self, input_dim, latent_dim, num_layers=1):
    super(Seq2SeqModel, self).__init__()

    self.encoder = nn.LSTM(input_dim, latent_dim, num_layers, batch_first=True)
    self.decoder = nn.LSTM(latent_dim, latent_dim, num_layers, batch_first=True)
    self.fc = nn.Linear(latent_dim, input_dim)

  def forward(self, input_seq):
    # Encoder
    _, (last_hidden, _) = self.encoder(input_seq)
    
    # reshape for batch
    encoded = last_hidden.repeat(len(input_seq), input_seq.size(1), 1)

    # Decoder
    decoder_output, _ = self.decoder(encoded)

    output = self.fc(decoder_output)
    
    return output

In [None]:
# Try to use GPU if available
USE_GPU = True

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda:0')
else:
    device = torch.device('cpu')

print(device)

### Seq2Seq model training:

This LSTM autoencoder will be trained on the full image input (128x128) flattened to a 16,384-element vector. This is not good! Later, will experiment with reduced input dimensions.

In [None]:
# define function to train the model
def train(model, train_data, criterion, optimizer, epochs):
  model.train
  for epoch in range(epochs):
    running_loss = 0.0
    for input_seq, target_seq in train_data:
      input_seq, target_seq = input_seq.to(device, dtype=torch.float32), target_seq.to(device, dtype=torch.float32)
      optimizer.zero_grad()
      output_seq = model(input_seq)
      loss = criterion(output_seq, target_seq)
      loss.backward()
      optimizer.step()
      running_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_data)}")

In [None]:
# Set hyperparameters
input_dim = 128*128
latent_dim = 512
num_layers = 1
learning_rate = 0.001
num_epochs = 100

In [None]:
# Initialise model, loss function, and optimizer
model = Seq2SeqModel(input_dim, latent_dim, num_layers)
model = model.to(device=device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Train the model
train(model, train_loader, criterion, optimizer, num_epochs)

### Evaluate on train set

To perform initial evaluations, I will load in the sequences of one video, stored in ./VIDEOS/small_test/, to visualise.

In [None]:
folder_path = './VIDEOS/small_test/'
test_dataset = VideoDataset(folder_path, sequence_length=sequence_length, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

print(len(test_loader))

In [None]:
import matplotlib.pyplot as plt

In [None]:
def decode_predictions(predictions):
  return predictions.view(batch_size, sequence_length, 128, 128)  # Reshape to image dimensions

def visualise_one_test_video(model, test_loader):
  model.eval()
  for input_seq, target_seq in test_loader:
    with torch.no_grad():
      input_seq, target_seq = input_seq.to(device), target_seq.to(device)

      # Predict sequences in reduced space
      output_seq = model(input_seq)
      # Decode predicted results into full space
      decoded_predictions = decode_predictions(output_seq)
      decoded_targets = decode_predictions(target_seq)
      
      # Example: Convert the first sequence in the batch to an image (assuming grayscale)
      
      fig, ax = plt.subplots(2, 4, figsize=(12, 6))

      for i in range(sequence_length):
        example_prediction = decoded_predictions[0, i].cpu().numpy()  # Extract first prediction from batch
        example_prediction = np.uint8(example_prediction * 255)  # Convert to 8-bit grayscale

        example_target = decoded_targets[0, i].cpu().numpy()
        example_target = np.uint8(example_target * 255)

        ax[0, i].imshow(example_prediction, cmap='binary')
        ax[1, i].imshow(example_target, cmap='binary')

      plt.show()

visualise_one_test_video(model, test_loader)

### Train model on reduced-dimension inputs

Clearly, passing in the flattened full image is not good. Instead, I will pass in the input images to the CAE, perform the LSTM encode/decode process using the encoded result of the CAE, and then decode the output of the LSTM decode process using the CAE decoder, and compute reconstruction accuracy that way.

In [31]:
# Define the CAE_LSTM model
class CAE_LSTM(nn.Module):
  def __init__(self, input_dim, cae_latent_dim, lstm_latent_dim, num_layers=1):
    super(CAE_LSTM, self).__init__()

    # CAE encoder
    self.cae_encoder = nn.Sequential(
      nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2, stride=2),
      nn.Conv2d(16, cae_latent_dim, kernel_size=3, stride=1, padding=1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size=2, stride=2)
    )

    # CAE decoder
    self.cae_decoder = nn.Sequential(
      nn.ConvTranspose2d(cae_latent_dim, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
      nn.ReLU(),
      nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
      nn.Sigmoid()
    )

    # LSTM encoder
    self.lstm_encoder = nn.LSTM(input_dim, lstm_latent_dim, num_layers, batch_first=True)

    # LSTM decoder
    self.lstm_decoder = nn.LSTM(lstm_latent_dim, lstm_latent_dim, num_layers, batch_first=True)

    # Fully-connected layer
    self.fc = nn.Linear(lstm_latent_dim, input_dim)

  def forward(self, x):
    # obtain the encoded form of the input sequence
    x = self.cae_encoder(x)

    # run the encoded input through the LSTM network
    _, (last_hidden, _) = self.lstm_encoder(x)
    encoded = last_hidden.repeat(len(x), x.size(1), 1)

    # get the predictions from the LSTM
    decoder_output, _ = self.lstm_decoder(encoded)
    output = self.fc(decoder_output)

    # obtain the decoded images
    output = self.cae_decoder(output)
    return output

In [None]:
cae_latent_dim = 64
lstm_latent_dim = 512
num_epochs = 10

# Initialise model, loss function, and optimizer
model = CAE_LSTM(input_dim, cae_latent_dim, lstm_latent_dim, num_layers)
model = model.to(device=device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Train the model
train(model, train_loader, criterion, optimizer, num_epochs)