# Prototype of lipreading pipeline

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim

#from torchtext.legacy.datasets import Multi30k
#from torchtext.legacy.data import Field, BucketIterator

#import spacy
import numpy as np

import random
import math
import time

# model file, encoder, decoder and seqtoseq
from model import *
# utils file
from utils import *
# Get landmark using vocadataset.py
from data.vocaset import *

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
class Encoder(nn.Module):
    def __init__(self, INPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS, DROPOUT):
        super().__init__()
        
        self.HID_DIM = HID_DIM
        
        #self.embedding = nn.Embedding(INPUT_DIM, EMB_DIM) FOR NOW WE DON't HAVE INPUT EMBEDDINGS
        
        self.rnn = nn.RNN(input_size=68*3, hidden_size=128, bidirectional=True, batch_first=True)
        
        #self.DROPOUT = nn.DROPOUT(DROPOUT)
        
    def forward(self, x):

        #The input to the encoder are the landmarks
        #x = [batch size, sequence_len , 68*3]
        
        #embedded = self.DROPOUT(self.embedding(src))FOR NOW WE DON't HAVE INPUT EMBEDDINGS
        
        #embedded = [src len, batch size, emb dim]

        out, hid = self.rnn(x)
        
        #out = [batch size, src len, hid dim * n directions]
        #hidd = [n directions, batch size, hid dim]
        
        #outputs are always from the top hidden layer

        #print("ENCODER: hid.shape: ", hid.shape)
        #print("ENCODER: hid:\n", hid)
        return hid



In [8]:
class Decoder(nn.Module):
    def __init__(self, output_dim, EMB_DIM, HID_DIM, N_LAYERS, DROPOUT):
        super().__init__()
        
        self.output_dim = output_dim
        self.HID_DIM = HID_DIM
        #self.N_LAYERS = N_LAYERS
        
        #self.embedding = nn.Embedding(output_dim, EMB_DIM)FOR NOW WE DON't HAVE INPUT EMBEDDINGS
        self.rnn = nn.RNN(input_size=1, hidden_size=128, bidirectional=True, batch_first=True)
        
        self.fc_out = nn.Linear(2*HID_DIM, output_dim)
        
        self.dropout = nn.Dropout(DROPOUT)
        
    def forward(self, input, hidden):
        
        #input = [batch size]
        #hidden = [n layers * n directions, batch size, hid dim]
        #cell = [n layers * n directions, batch size, hid dim]
        
        #n directions in the decoder will both always be 1, therefore:
        #hidden = [n layers, batch size, hid dim]
        #context = [n layers, batch size, hid dim]
        
        #input = input.unsqueeze(0).unsqueeze(0)COMMENTED TO DEBUG

        #print("DECODER: input.shape", input.shape)
        
        #input = [batch size, 1]
        
        #embedded = self.DROPOUT(self.embedding(input))
        
        #embedded = [1, batch size, emb dim]
                
        output, hidden = self.rnn(input.to(torch.float32), hidden)
        
        #output = [seq len, batch size, hid dim * n directions]
        #hidden = [n layers * n directions, batch size, hid dim]
        #cell = [n layers * n directions, batch size, hid dim]
        
        #seq len and n directions will always be 1 in the decoder, therefore:
        #output = [1, batch size, hid dim]
        #hidden = [n layers, batch size, hid dim]
        #cell = [n layers, batch size, hid dim]
        
        prediction = self.fc_out(output.squeeze(0))
        
        #prediction = [batch size, output dim]

        #print("DECODER: prediction.shape", prediction.shape)
        
        return prediction, hidden

In [9]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
        #assert encoder.HID_DIM == decoder.HID_DIM, \
        #    "Hidden dimensions of encoder and decoder must be equal!"
        #assert encoder.N_LAYERS == decoder.N_LAYERS, \
        #    "Encoder and decoder must have equal number of layers!"
        
    def forward(self, src, trg):
        
        #src = [src len, batch size]
        #trg = [trg len, batch size]
        #teacher_forcing_ratio is probability to use teacher forcing
        #e.g. if teacher_forcing_ratio is 0.75 we use ground-truth inputs 75% of the time
        
        batch_size = trg.shape[0]
        trg_len = src.shape[0]#FIXME Forse qui non ho passato i landmark con la dimensione del batch in testa
        trg_vocab_size = self.decoder.output_dim
        
        #tensor to store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        
        #last hidden state of the encoder is used as the initial hidden state of the decoder
        hidden = self.encoder(src)
        hidden = hidden.unsqueeze(1)
        #print("SEQ2SEQ: hidden.shape: ", hidden.shape)
        
        #first input to the decoder is the <sos> tokens
        input = torch.tensor(31).unsqueeze(0).unsqueeze(0).unsqueeze(0)#trg[0,:] 31 is the index of <sos>
        #print("SEQ2SEQ: input.shape: ", input.shape)
        
        for t in range(1, trg_len):
            
            #insert input token embedding, previous hidden and previous cell states
            #receive output tensor (predictions) and new hidden and cell states
            #print("SEQ2SEQ-FOR: input.shape: ", input.shape)
            #print("SEQ2SEQ-FOR: hidden.shape: ", hidden.shape)
            output, hidden = self.decoder(input, hidden)
            #print("SEQ2SEQ-FOR: output.shape: ", output.shape)
            
            output = output.unsqueeze(0)#ADDED TO BE CHECKED

            #place predictions in a tensor holding predictions for each token
            outputs[t] = output
            
            #get the highest predicted token from our predictions
            top1 = output.argmax(2) 

            input = top1.unsqueeze(0)
            #print("SEQ2SEQ-FOR: top1.shape: ", top1.shape)
        
        return outputs

In [2]:
# Print the vocabulary
print(vocabulary(blank='-', start='@', stop='#'))

['-', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '.', '?', ',', '!', '@', '#', ' ']


In [39]:
# Get landmark from vocadaset class
trainset = vocadataset("train", landmark=True)
landmark, labels = trainset[0]

In [14]:
# Test labels!
print("Before:", labels)
labels = '@'+labels+'#'         #Concatenating the start and stop character
print("After:", labels)

Before: she had your dark suit in greasy wash water all year
After: @she had your dark suit in greasy wash water all year#


In [40]:
INPUT_DIM = 68*3
EMB_DIM = 0
HID_DIM = 128
N_LAYERS = 0
DROPOUT = 0

# da cancellare??
#model = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS, DROPOUT)

reshaped_landmark = torch.reshape(landmark, (landmark.shape[0], landmark.shape[1]*landmark.shape[2]))
start_landmark = torch.zeros(1, 68*3)
stop_landmark = torch.ones(1, 68*3)

final_landmarks = torch.cat((start_landmark, reshaped_landmark, stop_landmark), 0)

#out = model(reshaped_landmark)

In [4]:
vocabulary = vocabulary(blank='-', start='@', stop='#')

In [5]:
# Create a mapping from characters to indices
char_to_index = {char: index for index, char in enumerate(vocabulary)}

In [18]:
print(char_to_index)

{'-': 0, 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12, 'm': 13, 'n': 14, 'o': 15, 'p': 16, 'q': 17, 'r': 18, 's': 19, 't': 20, 'u': 21, 'v': 22, 'w': 23, 'x': 24, 'y': 25, 'z': 26, '.': 27, '?': 28, ',': 29, '!': 30, '@': 31, '#': 32, ' ': 33}


In [19]:
# Convert the sequence and target to indices
#sequence_indices = [char_to_index[char] for char in sequence]
target_indices = [char_to_index[char] for char in labels]
target_tensor = torch.tensor(target_indices)

In [41]:
output_dim = len(vocabulary)

enc = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS, DROPOUT)
dec = Decoder(output_dim, EMB_DIM, HID_DIM, N_LAYERS, DROPOUT)

model = Seq2Seq(enc, dec, 'cpu')#.to(device)

reshaped_landmark = torch.reshape(landmark, (landmark.shape[0], landmark.shape[1]*landmark.shape[2]))
start_landmark = torch.zeros(1, 68*3)
stop_landmark = torch.ones(1, 68*3)

final_landmarks = torch.cat((start_landmark, reshaped_landmark, stop_landmark), 0)



In [None]:
# Define the CTC loss function
ctc_loss = nn.CTCLoss()

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10000
for epoch in range(num_epochs):
    optimizer.zero_grad()
    output = model(reshaped_landmark, target_tensor[None,:])
    

    input_lengths = torch.full((1,), output.size(0), dtype=torch.long)
    target_lengths = torch.full((target_tensor.size(0),), target_tensor.size(0), dtype=torch.long)
    
    loss = ctc_loss(output, target_tensor, input_lengths, target_lengths[0])
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}")
        e = torch.argmax(output, dim=2).squeeze(1)
        output_sequence = ''.join([vocabulary[index] for index in e])
        print(output_sequence)



In [None]:
# Decode the output sequence
output_indices = torch.argmax(output, dim=2).squeeze(1)
output_sequence = ''.join([vocabulary[index] for index in output_indices])


print("Target Sequence:", labels.replace("@","").replace("#",""))
print("Decoded Output:", process_string(output_sequence))