In [191]:
import os
import cv2
import tensorflow as tf
import numpy as np
from typing import List
import gdown
from matplotlib import pyplot as plt
import imageio
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Conv3D, LSTM, Dense, Dropout, Bidirectional, MaxPool3D, Activation, Reshape, SpatialDropout3D, BatchNormalization, TimeDistributed, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler

# grid dataset

In [147]:
# url = 'https://drive.google.com/uc?id=1YlvpDLix3S-U8fd-gqRwPcWXAXm8JwjL'
# output = 'data.zip'
# gdown.download(url, output, quiet=False)
# gdown.extractall('data.zip')

In [192]:
def video_load(path:str) -> List[float]: 

    cap = cv2.VideoCapture(path)
    frames = []
    for _ in range(int(cap.get(cv2.CAP_PROP_FRAME_COUNT))): 
        ret, frame = cap.read()
        frame = tf.image.rgb_to_grayscale(frame)
        frames.append(frame[190:236,80:220,:])
    cap.release()
    
    mean = tf.math.reduce_mean(frames)
    std = tf.math.reduce_std(tf.cast(frames, tf.float32))
    return tf.cast((frames - mean), tf.float32) / std
    
    # mean_image = np.zeros_like(frames[0])
    # for frame in frames:
    #     mean_image += frame
    #     mean_image /= len(frames)

In [193]:
vocab = [x for x in "abcdefghijklmnopqrstuvwxyz'?!123456789 "]

In [195]:
char_to_num = tf.keras.layers.StringLookup(vocabulary=vocab, oov_token="")
num_to_char = tf.keras.layers.StringLookup(vocabulary=char_to_num.get_vocabulary(), oov_token="", invert=True)

print(f"The vocabulary is: {char_to_num.get_vocabulary()} " f"(size ={char_to_num.vocabulary_size()})")

The vocabulary is: ['', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', "'", '?', '!', '1', '2', '3', '4', '5', '6', '7', '8', '9', ' '] (size =40)


In [196]:
def alignments_load(path:str) -> List[str]:
    with open(path, 'r') as f: 
        lines = f.readlines() 
    tokens = []
    for line in lines:
        line = line.split()
        if line[2] != 'sil': 
            tokens = [*tokens,' ',line[2]]
    return char_to_num(tf.reshape(tf.strings.unicode_split(tokens, input_encoding='UTF-8'), (-1)))[1:]

In [197]:
# try_path = "data/s1/bbaf4p.mpg"

In [153]:
# try_path.split('/')

In [154]:
# import os
# import shutil

# # Get the source and destination folder paths
# source_folder_path = "data/alignments/s1"
# destination_folder_path = "data/alignments"

# # Get a list of all files in the source folder
# files = os.listdir(source_folder_path)

# # Iterate over the files and move them to the destination folder
# for file in files:
#     shutil.move(os.path.join(source_folder_path, file), destination_folder_path)

# print("All files moved successfully!")


In [198]:
def data_load(path:str):
    path_1 = path
    path = bytes.decode(path.numpy())
    name_file = path.split('/')[-1].split('.')[0]
    video_path = path
    video_path = "data/s1/"+name_file+".mpg"
    alignments_path = os.path.join('data','alignments',f'{name_file}.align')
    frames = video_load(video_path)
    alignments = alignments_load(alignments_path)
    
    return frames, alignments

In [199]:
frames, alignments = data_load(tf.convert_to_tensor(try_path))

In [200]:
def mappable_function(path:str) -> List[str]:
    result = tf.py_function(data_load,[path],(tf.float32,tf.int64))
    return result

In [158]:
# plt.imshow(frames[28])

# Data Pipeline

In [201]:
data = tf.data.Dataset.list_files("./data/s1/*.mpg")
data = data.shuffle(500)
data = data.map(mappable_function)
data = data.padded_batch(2,padded_shapes=([75,None,None,None],[40]))
data = data.prefetch(tf.data.AUTOTUNE)

train = data.take(450)
test = data.skip(450)

In [202]:
frames,alignments=data.as_numpy_iterator().next()

In [161]:
# plt.imshow(frames[0])

In [162]:
# alignments

In [163]:
# test = data.as_numpy_iterator()
# val = test.next(); val[0]

In [164]:
# imageio.mimsave?

In [165]:
# imageio.mimsave('./animation.gif',val[0][1],10)

In [166]:
# plt.imshow(val[0][0][0])

In [167]:
# tf.strings.reduce_join([num_to_char(word) for word in val[1][0]])
# check=0
# sen=""
# for word in val[1][0]:
#     word = num_to_char(word)
#     sen+=word

In [168]:
# sen

''

# layers

In [169]:
# Loss function = CTC (Connectionist Temporal Classification)
# works great for work trascripction which arent aligned to frames, given structure of this model it will repert same letter or word multiple times
# CTC reduces duplicates

In [203]:
data.as_numpy_iterator().next()[0][0].shape

(75, 46, 140, 1)

In [204]:
model = Sequential()
model.add(Conv3D(128, 3, input_shape=(75,46,140,1), padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(256, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(Conv3D(75, 3, padding='same'))
model.add(Activation('relu'))
model.add(MaxPool3D((1,2,2)))

model.add(TimeDistributed(Flatten()))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Bidirectional(LSTM(128, kernel_initializer='Orthogonal', return_sequences=True)))
model.add(Dropout(.5))

model.add(Dense(char_to_num.vocabulary_size()+1, kernel_initializer='he_normal', activation='softmax'))

In [205]:
model.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d_12 (Conv3D)          (None, 75, 46, 140, 128   3584      
                             )                                   
                                                                 
 activation_12 (Activation)  (None, 75, 46, 140, 128   0         
                             )                                   
                                                                 
 max_pooling3d_12 (MaxPooli  (None, 75, 23, 70, 128)   0         
 ng3D)                                                           
                                                                 
 conv3d_13 (Conv3D)          (None, 75, 23, 70, 256)   884992    
                                                                 
 activation_13 (Activation)  (None, 75, 23, 70, 256)   0         
                                                      

In [173]:
# yhat = model.predict(val[0])

In [174]:
# yhat.shape

In [175]:
# sen=""
# for word in yhat[0]:
#     word = num_to_char(word)
#     sen+=word
# tf.strings.reduce_join([num_to_char(x) for x in tf.argmax(yhat[0],axis=1)])

In [176]:
print(model.input_shape)
print(model.output_shape)


(None, 75, 46, 140, 1)
(None, 75, 41)


# traing

In [206]:
def scheduler(epoch, lr):
    if epoch < 30:
        return lr
    else:
        return lr * tf.math.exp(-0.1)

In [207]:
def CTCLoss(y_true, y_pred):
    batch_len = tf.cast(tf.shape(y_true)[0], dtype="int64")
    input_length = tf.cast(tf.shape(y_pred)[1], dtype="int64")
    label_length = tf.cast(tf.shape(y_true)[1], dtype="int64")

    input_length = input_length * tf.ones(shape=(batch_len, 1), dtype="int64")
    label_length = label_length * tf.ones(shape=(batch_len, 1), dtype="int64")

    loss = tf.keras.backend.ctc_batch_cost(y_true, y_pred, input_length, label_length)
    return loss

In [208]:
class ProduceExample(tf.keras.callbacks.Callback): 
    def __init__(self, dataset) -> None: 
        self.dataset = dataset.as_numpy_iterator()
    
    def on_epoch_end(self, epoch, logs=None) -> None:
        data = self.dataset.next()
        yhat = self.model.predict(data[0])
        decoded = tf.keras.backend.ctc_decode(yhat, [75,75], greedy=False)[0][0].numpy()
        for x in range(len(yhat)):           
            print('Original:', tf.strings.reduce_join(num_to_char(data[1][x])).numpy().decode('utf-8'))
            print('Prediction:', tf.strings.reduce_join(num_to_char(decoded[x])).numpy().decode('utf-8'))
            print('~'*100)

In [209]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss=CTCLoss)



In [210]:
checkpoint_callback = ModelCheckpoint(os.path.join('models','checkpoint'), monitor='loss', save_weights_only=True) 

In [211]:
schedule_callback = LearningRateScheduler(scheduler)

In [212]:
example_callback = ProduceExample(test)

In [184]:
# model.fit(data, epochs=100, callbacks=[checkpoint_callback, schedule_callback, example_callback])

# Prediction

In [185]:
# url = 'https://drive.google.com/uc?id=1vWscXs4Vt0a_1IH1-ct2TCgXAZT-N3_Y'
# output = 'checkpoints.zip'
# gdown.download(url, output, quiet=False)
# gdown.extractall('checkpoints.zip', 'models')

In [219]:
model.load_weights('models/checkpoint')

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x29531f790>

In [220]:
# test_data = test.as_numpy_iterator()

In [221]:
# sample = test_data.next()

[mpeg1video @ 0x105065930] ac-tex damaged at 22 17


In [226]:
# yhat = model.predict(sample[0])



In [223]:
# decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()

In [227]:
# print('~'*100, 'REAL TEXT')
# [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in sample[1]]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ REAL TEXT


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin red in z six now'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'set green in o nine again'>]

In [228]:
# decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75,75], greedy=True)[0][0].numpy()

In [229]:
# print('~'*100, 'PREDICTIONS')
# [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PREDICTIONS


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin red in z six now'>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'set green in o nine again'>]

# Test

In [231]:
# sample = data_load(tf.convert_to_tensor('./data/s1/bbal8p.mpg'))

In [233]:
# print('~'*100, 'REAL TEXT')
# [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in [sample[1]]]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ REAL TEXT


[<tf.Tensor: shape=(), dtype=string, numpy=b'bin blue at l eight please'>]

In [234]:
# yhat = model.predict(tf.expand_dims(sample[0], axis=0))



In [235]:
# decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()

In [236]:
# print('~'*100, 'PREDICTIONS')
# [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PREDICTIONS
tf.Tensor(b'bin blue at l eight please', shape=(), dtype=string)


In [239]:
def final_model(path):
    a = data_load(path)
    yhat = model.predict(tf.expand_dims(a[0], axis=0))
    decoded = tf.keras.backend.ctc_decode(yhat, input_length=[75], greedy=True)[0][0].numpy()
    # print('~'*100, 'Real')
    real = [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]
    # print('~'*100, 'Pridiction')
    pridict = [tf.strings.reduce_join([num_to_char(word) for word in sentence]) for sentence in decoded]
    
    return real, pridict

In [244]:
def final_output(path):
    print('~'*100, 'Real')
    print(final_model(tf.convert_to_tensor(path))[0])
    print('~'*100, 'Pridiction')
    print(final_model(tf.convert_to_tensor(path))[0])


In [245]:
final_output("./data/s1/bbal8p.mpg")

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Real
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Real
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Pridiction
[<tf.Tensor: shape=(), dtype=string, numpy=b'bin blue at l eight please'>]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Pridiction
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Real
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Pridiction
[<tf.Tensor: shape=(), dtype=string, numpy=b'bin blue at l eight please'>]
