In [None]:
#Install and import dependencies
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
import matplotlib.pyplot as plt
import imageio

In [None]:
#Locating the GPU and ensuring it doesn't use up all system memory
physical_devices = tf.config.list_physical_devices('GPU')
try:
  tf.config.experimental.set_memory_growth(physical_devices[0], True)
except:
  pass

In [None]:
import gdown

In [None]:
#Download the data
url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
output = 'data.zip'
gdown.download(url, output, quiet = False)
gdown.extractall('data.zip')

In [None]:
#Convert each video to frames
def load_video(path:str) -> List[float]:
  cap = cv2.VideoCapture(path)
  frames = []
  for x in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))):
    ret, frame = cap.read()
    frame = tf.image.rgb_to_grayscale(frame)
    frames.append(frame[190:236, 80:220, :])
  cap.release()

  #Normalise the data
  mean = tf.math.reduce_mean(frames)
  std = tf.math.reduce_std(tf.cast(frames, tf.float32))
  return tf.cast((frames - mean), tf.float32)/std

In [None]:
#Define the set of characters we are expecting
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [None]:
#Convet sequence of chars to num codes and vice versa (tokenisation)

char_to_num = tf.keras.layers.StringLookup(vocabulary = vocab, oov_token = "")
num_to_char = tf.keras.layers.StringLookup(vocabulary = char_to_num.get_vocabulary(), oov_token = "", invert = True)


In [None]:
#Load the video labels
def load_alignments(path:str) -> List[str]:
  with open(path, 'r') as f:
    lines = f.readlines()
  tokens = []
  for line in lines:
    line = line.split()
    if line[2] != 'sil':
      tokens = [*tokens, ' ', line[2]]
  return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding = 'UTF-8'), (-1)))[1:]

In [None]:
#Load all the data
def load_data(path: str):
  path = bytes.decode(path.numpy())
  file_name = path.split('/')[-1].split('.')[0]
  video_path = os.path.join('data', 's1', f'{file_name}.mpg')
  alignment_path = os.path.join('data', 'alignments', 's1', f'{file_name}.align')
  frames = load_video(video_path)
  alignments = load_alignments(alignment_path)

  return frames, alignments

In [None]:
#Testing the load data function
test_path = "./data/s1/bbaf2n.mpg"
frames, alignments = load_data(tf.convert_to_tensor(test_path))

In [None]:
print([bytes.decode(x) for x in num_to_char(alignments.numpy()).numpy()])

['b', 'i', 'n', ' ', 'b', 'l', 'u', 'e', ' ', 'a', 't', ' ', 'f', ' ', 't', 'w', 'o', ' ', 'n', 'o', 'w']


In [None]:
from typing import Tuple
def mappable_function(path:str) -> Tuple[tf.Tensor, tf.Tensor]:
  result = tf.py_function(load_data, [path], (tf.float32, tf.int64))
  return result

In [None]:
#Creating our data pipeline
data = tf.data.Dataset.list_files('./data/s1/*.mpg')
data = data.shuffle(500)
data = data.map(mappable_function)
data = data.padded_batch(2, padded_shapes = ([75, None, None, None], [40]))
data = data.prefetch(tf.data.AUTOTUNE)

train_split, test_split, val_split = 0.6, 0.2, 0.2
num_samples = len(data)
train_num, test_num, val_num = int(num_samples * train_split), int(num_samples * test_split), int(num_samples * val_split)

train = data.take(train_num)
temp_data = data.skip(train_num)
val = temp_data.take(val_num)
test = temp_data.skip(val_num)

In [None]:
#Designing the deep neural network
#Importing classes
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
#Defining the model architecture
model = Sequential()
model.add(Conv3D(128, 3, input_shape = (75, 46, 140, 1), padding = 'same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding = 'same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding = 'same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(128, kernel_initializer = 'Orthogonal', return_sequences = True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer = 'Orthogonal', return_sequences = True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer = 'he_normal', activation = 'softmax'))


In [None]:
#Defining a learning rate scheduler
def scheduler(epoch, lr):
  if epoch < 30:
    return lr
  else:
    return lr * tf.math.exp(-0.1)

In [None]:
#Defining the CTC loss function
def CTCloss(y_true, y_pred):
  batch_length = tf.cast(tf.shape(y_true)[0], dtype = "int64")
  input_length = tf.cast(tf.shape(y_pred)[1], dtype = "int64")
  label_length = tf.cast(tf.shape(y_true)[1], dtype = "int64")

  input_length = input_length * tf.ones(shape=(batch_length, 1), dtype = "int64")
  label_length = label_length * tf.ones(shape=(batch_length, 1), dtype = "int64")

  loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
  return loss

In [None]:
#Produce a video example with the true words and predicted words

class ProduceExample(tf.keras.callbacks.Callback):
  def __init__(self, dataset) -> None:
    self.dataset = dataset.as_numpy_iterator()

  def on_epoch_end(self, epoch, logs = None) -> None:
    data = self.dataset.next()
    yhat = self.model.predict(data[0])
    decoded = tf.keras.backend.ctc_decode(yhat, [75, 75], greedy = False)[0][0].numpy()
    for x in range(len(yhat)):
      print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
      print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
      print('~'*100)

In [None]:
model.compile(optimizer = Adam(learning_rate = 0.001), loss=CTCloss)

In [None]:
checkpoint = ModelCheckpoint(os.path.join('model', 'checkpoint'), monitor = 'loss', save_weights_only = True)

In [None]:
schedule_callback = LearningRateScheduler(scheduler)

In [None]:
example_callback = ProduceExample(test)

In [None]:
history = model.fit(train, validation_data = val, epochs = 100, callbacks = [checkpoint, schedule_callback, example_callback], verbose = 1, batch_size = 64)

Epoch 1/100
 64/300 [=====>........................] - ETA: 3:38:18 - loss: 105.7146

In [None]:
test_data = test.as_numpy_iterator()

In [None]:
sample = test_data.next()

In [None]:
yhat = model.predict(sample[0])



In [None]:
print('~'*50, 'Actual text')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Actual text


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin white in g zero now'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'lay white by l nine again'>]

In [None]:
decoded = tf.keras.backend.ctc_decode(yhat, input_length = [75, 75], greedy = True)[0][0].numpy()

In [None]:
print('~'*50, 'Predictions')
[tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Predictions


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin white in g zero now'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'lay white by l nine again'>]

In [None]:
len(os.listdir('/content/data/s1'))

1001