# Imports


In [1]:
import glob
import itertools
import numpy as np
import imageio
from IPython.display import Image
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet
from tensorflow.keras import models
from tensorflow.keras.callbacks import TensorBoard

tf.executing_eagerly()

True

# Word and Phrases Extraction

In [None]:
words = set()
people = set()
for name in glob.glob("crema_cropped/*.gif"):
    name = name.replace("crema_cropped/","")
    s = name.split("_")
    people.add(s[0])
    words.add(s[1])
words = list(words)
people = list(people)
len(people)

# Triplet Generation

In [16]:
np.random.shuffle(people)
people_train, people_val, people_test = people[:66], people[66:77], people[77:]

def get_triplets(ppl):
  c = 0
  triplets = []
  for person in ppl:
    for word in words:
      for i in range(1,7):
        anchor = "crema_cropped/" + person + "_" + word + "_" + str(i) + '.gif'
        positives = ["crema_cropped/" + x[0] + x[1] + '.gif' for x in list(itertools.product([person + "_" + word + "_"],[str(j) for j in range(1,7) if i != j]))]
        negatives = ["crema_cropped/" + x[0] + x[1] + x[2] + '.gif' for x in list(itertools.product([person + "_"],[w + "_" for w in list(words) if w != word],[str(j) for j in range(1,7)])) + list(itertools.product([p + "_" for p in ppl if p != person],[word+ "_"],[str(j) for j in range(1,7)]))]
        prod = list(itertools.product([anchor],positives,negatives))
        triplets += prod
  np.random.shuffle(triplets)
  return triplets


triplets_train = get_triplets(people_train)
triplets_val = get_triplets(people_val)
triplets_test = get_triplets(people_test)
(len(triplets_train),len(triplets_val),len(triplets_test))

(10834560, 498960, 498960)

# Image and Landmark Pre-Processing

In [24]:
def preprocess_image(filename):
    return tf.RaggedTensor.from_tensor(tf.expand_dims(tf.image.convert_image_dtype(tf.io.decode_gif(tf.io.read_file(filename)), tf.float32),0))

def preprocess_images(anchor, positive, negative):
  return (
      preprocess_image(anchor),
      preprocess_image(positive),
      preprocess_image(negative),
  )


def preprocess_landmark(filename):
    filename = tf.strings.regex_replace(filename, '\.gif', '.tf')
    data = tf.io.read_file(filename)
    landmarks = tf.io.parse_tensor(data, out_type=tf.float32)
    tensor = tf.convert_to_tensor(landmarks, dtype=tf.float32)
    tensor.set_shape([None, 24, 2])
    return tf.RaggedTensor.from_tensor(tf.expand_dims(tensor,0))

def preprocess_landmarks(anchor, positive, negative):
    return [
        preprocess_landmark(anchor),
        preprocess_landmark(positive),
        preprocess_landmark(negative),
    ]

def squeeze(a,p,n):
  return (
      tf.squeeze(a, axis=1),
      tf.squeeze(p, axis=1),
      tf.squeeze(n, axis=1),
  )

def reshape(i,l):
    return (
        (i[0],l[0]),
        (i[1],l[1]),
        (i[2],l[2]) 
    )

# Creating train, validation and test datasets

In [25]:
# Train
dataset_train = tf.data.Dataset.zip((
    tf.data.Dataset.from_tensor_slices(np.array(triplets_train)[:,0]),
    tf.data.Dataset.from_tensor_slices(np.array(triplets_train)[:,1]),
    tf.data.Dataset.from_tensor_slices(np.array(triplets_train)[:,2])
))

dataset_train = dataset_train.shuffle(buffer_size=1024)
dataset_train_images = dataset_train.map(preprocess_images)
dataset_train_landmarks = dataset_train.map(preprocess_landmarks)
dataset_train_images = dataset_train_images.batch(32, drop_remainder=False)
dataset_train_landmarks = dataset_train_landmarks.batch(32, drop_remainder=False)
dataset_train_images = dataset_train_images.map(squeeze)
dataset_train_landmarks = dataset_train_landmarks.map(squeeze)
dataset_train = tf.data.Dataset.zip((dataset_train_images,dataset_train_landmarks))
dataset_train = dataset_train.map(reshape)
dataset_train = dataset_train.prefetch(32)


# Val
dataset_val = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(np.array(triplets_val)[:,0]),
                                     tf.data.Dataset.from_tensor_slices(np.array(triplets_val)[:,1]),
                                     tf.data.Dataset.from_tensor_slices(np.array(triplets_val)[:,2])))
dataset_val = dataset_val.shuffle(buffer_size=1024)
dataset_val_images = dataset_val.map(preprocess_images)
dataset_val_landmarks = dataset_val.map(preprocess_landmarks)
dataset_val_images = dataset_val_images.batch(32, drop_remainder=False)
dataset_val_landmarks = dataset_val_landmarks.batch(32, drop_remainder=False)
dataset_val_images = dataset_val_images.map(squeeze)
dataset_val_landmarks = dataset_val_landmarks.map(squeeze)
dataset_val = tf.data.Dataset.zip((dataset_val_images,dataset_val_landmarks))
dataset_val = dataset_val.map(reshape)
dataset_val = dataset_val.prefetch(32)

# Test
dataset_test = tf.data.Dataset.zip((tf.data.Dataset.from_tensor_slices(np.array(triplets_test)[:,0]),
                                     tf.data.Dataset.from_tensor_slices(np.array(triplets_test)[:,1]),
                                     tf.data.Dataset.from_tensor_slices(np.array(triplets_test)[:,2])))
dataset_test = dataset_test.shuffle(buffer_size=1024)
dataset_test_images = dataset_test.map(preprocess_images)
dataset_test_landmarks = dataset_test.map(preprocess_landmarks)
dataset_test_images = dataset_test_images.batch(32, drop_remainder=False)
dataset_test_landmarks = dataset_test_landmarks.batch(32, drop_remainder=False)
dataset_test_images = dataset_test_images.map(squeeze)
dataset_test_landmarks = dataset_test_landmarks.map(squeeze)
dataset_test = tf.data.Dataset.zip((dataset_test_images,dataset_test_landmarks))
dataset_test = dataset_test.map(reshape)
dataset_test = dataset_test.prefetch(32)

In [2]:
# LipNet
input_images = layers.Input((40, 18, 30, 3))
input_landmarks = layers.Input((40, 24, 2))

zero1 = layers.ZeroPadding3D(padding=(1, 2, 2), name='zero1')(input_images)
conv1 = layers.Conv3D(32, (3, 5, 5), strides=(1, 2, 2), padding='same', activation='relu', kernel_initializer='he_normal', name='conv1')(input_images)
maxp1 = layers.MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), name='max1')(conv1)
drop1 = layers.Dropout(0.5)(maxp1)

zero2 = layers.ZeroPadding3D(padding=(1, 2, 2), name='zero2')(drop1)
conv2 = layers.Conv3D(64, (3, 5, 5), strides=(1, 1, 1), activation='relu', kernel_initializer='he_normal', name='conv2')(zero2)
maxp2 = layers.MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), name='max2')(conv2)
drop2 = layers.Dropout(0.5)(maxp2)

zero3 = layers.ZeroPadding3D(padding=(1, 1, 1), name='zero3')(drop2)
conv3 = layers.Conv3D(96, (3, 3, 3), strides=(1, 1, 1), activation='relu', kernel_initializer='he_normal', name='conv3')(zero3)
maxp3 = layers.MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), name='max3')(conv3)
drop3 = layers.Dropout(0.5)(maxp3)

resh1 = layers.TimeDistributed(layers.Flatten())(drop3)

gru_1 = layers.Bidirectional(layers.GRU(256, return_sequences=True, kernel_initializer='Orthogonal', name='gru1'), merge_mode='concat')(resh1)
gru_2 = layers.Bidirectional(layers.GRU(256, return_sequences=True, kernel_initializer='Orthogonal', name='gru2'), merge_mode='concat')(gru_1)
flatten = layers.Flatten()(gru_2)

td1 = layers.TimeDistributed(layers.Dense(4),name="td1")(input_landmarks)
td2 = layers.TimeDistributed(layers.Dense(8))(td1)
td3 = layers.TimeDistributed(layers.Dense(16))(td2)
td4 = layers.TimeDistributed(layers.Flatten(),name="t2")(td3)
l_lstm = layers.LSTM(64,return_sequences=False)(td4)
concat = tf.keras.layers.Concatenate(axis=1)([flatten,l_lstm])

b4_l2 = layers.BatchNormalization()(concat)
b4_l3 = layers.Dropout(.1)(b4_l2)
b4_l4 = layers.Dense(512)(b4_l3)
b4_l5 = layers.Dense(256)(b4_l4)

embedding = Model(inputs=(input_images, input_landmarks), outputs=b4_l5)
embedding.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 40, 18, 30,  0                                            
__________________________________________________________________________________________________
conv1 (Conv3D)                  (None, 40, 9, 15, 32 7232        input_1[0][0]                    
__________________________________________________________________________________________________
max1 (MaxPooling3D)             (None, 40, 4, 7, 32) 0           conv1[0][0]                      
__________________________________________________________________________________________________
dropout (Dropout)               (None, 40, 4, 7, 32) 0           max1[0][0]                       
______________________________________________________________________________________________

# Plot of the Network Architecture : <br>
<img src="https://github.com/ab2llah/WhisperNet/raw/main/whisper-lipnet.png">

# Siamese Network

In [28]:
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        #print(ap_distance, an_distance)
        return (ap_distance, an_distance)

anchor_input_image = layers.Input(name="anchor_image", shape=(None, 18, 30, 3))
positive_input_image = layers.Input(name="positive_image", shape=(None, 18, 30, 3))
negative_input_image = layers.Input(name="negative_image", shape=(None, 18, 30, 3))

anchor_input_landmark = layers.Input(name="anchor_landmark", shape=(None, 24, 2))
positive_input_landmark = layers.Input(name="positive_landmark", shape=(None, 24, 2))
negative_input_landmark = layers.Input(name="negative_landmark", shape=(None, 24, 2))


distances = DistanceLayer()(
    embedding((anchor_input_image, anchor_input_landmark)),
    embedding((positive_input_image, positive_input_landmark)),
    embedding((negative_input_image, negative_input_landmark)),
)

siamese_network = Model(
    inputs=[(anchor_input_image, anchor_input_landmark),
            (positive_input_image, positive_input_landmark),
            (negative_input_image, negative_input_landmark)],
    outputs=distances
)


class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
       d(a,p) <= d(a,n) => d(a,p) - d(a,n) + margin <= 0 
    """


    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")
        self.accuracy_tracker = metrics.Mean(name="accuracy")
        self.val = 0

    def call(self, inputs):
        
        return self.siamese_network([inputs["images"],inputs["landmarks"]])

    def train_step(self, data):
        
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )
        accuracy = self._compute_accuracy(data)
        self.loss_tracker.update_state(loss)
        self.accuracy_tracker.update_state(accuracy)
        return {"loss": self.loss_tracker.result(),  "accuracy": self.accuracy_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)
        accuracy = self._compute_accuracy(data)

        self.loss_tracker.update_state(loss)
        self.accuracy_tracker.update_state(accuracy)
        return {"loss": self.loss_tracker.result(), "accuracy": self.accuracy_tracker.result()}

    def _compute_loss(self, data):
        ap_distance, an_distance = self.siamese_network(data)
        loss = ap_distance - an_distance
        self.val = loss
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss
    
    def _compute_accuracy(self, val):
        return 1 / (1 + tf.exp(2 * self.val))

    @property
    def metrics(self):
        return [self.loss_tracker, self.accuracy_tracker]

from tensorflow.keras.callbacks import ModelCheckpoint

checkpoint = ModelCheckpoint("Models/crema_combined0.tf", 
                             monitor = 'val_loss',
                             mode = 'min',
                             save_best_only = True,
                             save_weights_only=True, 
                             verbose = 1)

siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.001))

# Training Processs

In [None]:
NAME = "crema_combined0"
tensorboard = TensorBoard(log_dir=f"logs/{NAME}")

siamese_model.fit(
    dataset_train,
    epochs=500,
    verbose='auto',
    steps_per_epoch=16,
    callbacks=[checkpoint,tensorboard],
    validation_data=dataset_val,
    shuffle=False,
    use_multiprocessing=True,
    validation_steps=4
)

