In [4]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import string
import h5py
import torch.nn.functional as F
import string
import re
import sys
import pandas as pd
from torch.nn.utils.rnn import pad_sequence

from datetime import datetime

In [5]:
import json
with open('char_vocab.json') as json_file:
    vocabulary = json.load(json_file)

with open('word_vocab.json') as json_file:
    word_vocabulary = json.load(json_file)

In [6]:
ds = pd.read_csv("how2sign_realigned_train.csv", sep="\t")

In [7]:
texts ="".join(ds["SENTENCE"])

In [8]:
"#" in texts #SOS

False

In [10]:
"*" in texts #EOS

False

In [11]:
texts=""
for k in ds['SENTENCE']:
  texts+=f"#{k}*"

## Vocabulary and tokenizer

In [12]:
import string
accepted_text= string.ascii_lowercase+ string.digits + '!?.,\"()&+-/@%–' + ":" + " " + "\n"
chars = [x for x in accepted_text]
print(chars)

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '!', '?', '.', ',', '"', '(', ')', '&', '+', '-', '/', '@', '%', '–', ':', ' ', '\n']


In [13]:
class Tokenizer:
  def __init__(self,char_vocab,word_vocab,chars):
    self.char_vocabulary = char_vocab
    self.word_vocabulary = word_vocab

    self.chars = chars

  def __call__(self,text,label=False):
    char_tokens = []
    word_tokens = []
    replacements = str.maketrans({
    'İ': 'I',
    'ç': 'c',
    'ş': 's',
    'ü': 'u',
    'ö': 'o',
    'ğ': 'g',
    'ı': 'i'
    })
    text = text.translate(replacements) # Replace special characters
    for char in text.lower():
      if char not in "#*":
        if char not in self.char_vocabulary.keys():
          char_tokens.append(self.char_vocabulary["<UNK>"]) # Add unknown token if character is not in vocabullary
        else:
          char_tokens.append(self.char_vocabulary[char])
      else:
        if char == "#":
          char_tokens.append(self.char_vocabulary["<SOS>"]) # Add Start of the sentence token 
        else:
          char_tokens.append(self.char_vocabulary["<EOS>"])  # Add End of the sentence token 

    if label:
      return torch.tensor(char_tokens)

    # Word tokenization
    for word in text.lower().split(" "): #
      k=word.strip().lower().replace('"','').replace("\n",' ') # Replace special characters
      k=re.sub(r'[.,()]', '', k)
      if k not in self.word_vocabulary.keys():
        word_tokens.append(self.char_vocabulary["<UNK>"]) # Add unknown token if word is not in vocabullary
        continue
      word_tokens.append(self.word_vocabulary[k]) # add word token to tokens
    if word_tokens[0]==1: 
      word_tokens = word_tokens[1:]
    word_tokens = torch.tensor(word_tokens) # Convert to tensor
    padded_sequences =F.pad(word_tokens, (90-len(word_tokens),0), "constant", self.word_vocabulary["<PAD>"]) # Pad word_tokens to match char_tokens size
    return torch.tensor(char_tokens),padded_sequences




In [16]:
# Test of the tokenizer
tokenizer = Tokenizer(vocabulary,word_vocabulary,chars)
tokenizer("Davud je haf[iz")

(tensor([ 5,  2, 23, 22,  5, 53, 11,  6, 53,  9,  2,  7,  1, 10, 27]),
 tensor([    0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
             0,     0,     0,     0,     0,     0,     0,     0, 28389,     1]))

In [17]:
class TextDataset(Dataset):
  def __init__(self, texts, tokenizer=None, seq_len=90):
    self.tokenizer=tokenizer
    self.text=texts
    self.seq_len=seq_len
  def __len__(self):
    return len(self.text)-self.seq_len
  def __getitem__ (self, idx):
    x = self.text[idx:idx + self.seq_len]
    #y = self.text[idx + self.seq_len]
    y = self.text[idx+1:idx+self.seq_len+1]

    #x = self.text[idx * self.seq_len : (idx + 1) * self.seq_len]
    #y = self.text[idx * self.seq_len + 1 : (idx + 1) * self.seq_len + 1]

    #x = self.text[idx:idx + self.seq_len]
    #y = self.text[(idx + self.seq_len-19):idx + self.seq_len+1]
    if self.tokenizer is not None:
      x=self.tokenizer(x)
      y=self.tokenizer(y,label=True).view(-1)
    return x,y

In [18]:
test_size = 0.15
train_text = texts[:int(len(texts)*(1-test_size))]
test_text = texts[int(len(texts)*(1-test_size)):]

In [19]:
train_dataset=TextDataset(train_text,tokenizer=tokenizer)
test_dataset=TextDataset(test_text,tokenizer=tokenizer)

In [20]:
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=32, drop_last=True)

In [21]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cuda:0


In [28]:
class ASLenseRNN(nn.Module):
  def __init__(self,vocab_size,embed_size,n_hidden,n_layers,embedding_matrix):
    super(ASLenseRNN,self).__init__()
    self.V = vocab_size
    self.D = embed_size
    self.M = n_hidden
    self.L = n_layers

    self.embedding = nn.Embedding(self.V,self.D)
    self.word2vec = nn.Embedding(42605,self.D,padding_idx=word_vocabulary["<PAD>"])#.from_pretrained(embedding_matrix, freeze=True,padding_idx=word_vocabulary["<PAD>"])  # Set freeze=False to fine-tune


    # Initalize rnn and fc layers
    self.rnn = nn.LSTM(input_size=self.D*2,
                      hidden_size=self.M,
                      num_layers=self.L,
                      dropout=0.2,
                      batch_first=True)

    self.fc = nn.Sequential(
          nn.Linear(self.M, 1024),
          nn.ReLU(),
          nn.Linear(1024, self.V),
         # nn.ReLU(),
         # nn.Linear(512,self.K)
        )

  def forward(self, X, hidden):
    # Embedding layer:

    char_embed = self.embedding(X[0])
    word_embed = self.word2vec(X[1])
    # pass through rnn
  #  print(char_embed.shape,word_embed.shape)
    out = torch.cat([char_embed, word_embed], dim=-1)
    out,hidden_state= self.rnn(out,hidden)
   # out = F.relu(out)
    out = self.fc(out)
    return out,(hidden_state[0].detach(), hidden_state[1].detach())


In [29]:
model = ASLenseRNN(vocab_size=57,
            embed_size=300,
            n_hidden=128*3,
            n_layers=3,
            embedding_matrix=None)
model.to(device)

ASLenseRNN(
  (embedding): Embedding(57, 300)
  (word2vec): Embedding(42605, 300, padding_idx=0)
  (rnn): LSTM(600, 384, num_layers=3, batch_first=True, dropout=0.2)
  (fc): Sequential(
    (0): Linear(in_features=384, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=57, bias=True)
  )
)

In [31]:
model.load_state_dict(torch.load('model_checkpoint.pt',map_location=device,weights_only=False)['model_state_dict'])

<All keys matched successfully>

In [32]:
for i, k in train_loader:
  print(i[0].shape,i[1].shape)
  i=i[0].view(32,-1),i[1].view(32,-1)
  #print(k.shape)
  tada,r = model((i[0].to(device),i[1].to(device)), None)
  print(tada.shape)
  break

torch.Size([32, 90]) torch.Size([32, 90])
torch.Size([32, 90, 57])


## Train model

In [34]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.0001, weight_decay=1e-4)

In [35]:
from helper_functions import progress_bar, plot_loss_curves,SaveModelCheckpoint

In [36]:
save_model_checkpoint = SaveModelCheckpoint(path="model_checkpoint_ft.pt")
best_val_loss=float('inf')

In [37]:
print(len(train_loader))

76175


In [None]:
epoches=10
train_losses = np.zeros(epoches)
val_losses = np.zeros(epoches)
for it in range(epoches):
  t0 = datetime.now()
  current_batch = 0
  total_batches = len(train_loader)
  model.train() # set model to train mode
  train_loss=[]
  val_loss=[]
  hidden_state = None
  # train
  for inputs,targets in train_loader:
    # move data to gpu
    inputs,targets = (inputs[0].to(device),inputs[1].to(device)),targets.to(device)
    #inputs = inputs.permute(0,2,1)
    # zero gradients
    optimizer.zero_grad()
    # forward pass
    outputs,_ = model((inputs[0].view(32,-1).long(),inputs[1].view(32,-1).long()),hidden_state)
    outputs = outputs.view(-1, outputs.size(-1))
    loss = criterion(outputs,targets.view(-1))

    # backward
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # Gradient clipping
    optimizer.step()

    train_loss.append(loss.item())
    current_batch = progress_bar(current_batch,total_batches)

  model.eval() # set model to eval mode
  current_batch = 0
  total_batches = len(test_loader)
  for inputs,targets in test_loader:
    # move data to gpu
    inputs,targets = (inputs[0].to(device).long(),inputs[1].to(device).long()),targets.to(device)
   # inputs = inputs.permute(0,2,1)


    # forward pass
    outputs,hidden_state = model((inputs[0].view(32,-1).long(),inputs[1].view(32,-1).long()),None)
    outputs = outputs.view(-1, outputs.size(-1))
    loss = criterion(outputs,targets.view(-1))

    val_loss.append(loss.item())
    current_batch = progress_bar(current_batch,total_batches,validation=True)


  # calculate loss
  print('\r')
  train_loss = np.mean(train_loss)
  val_loss = np.mean(val_loss)
  best_val_loss=  save_model_checkpoint(val_loss,best_val_loss,train_loss,it, model=model, optimizer=optimizer)

  # append loss
  train_losses[it]=train_loss
  val_losses[it]=val_loss
  dt = datetime.now() - t0
  print(f"Epoch {it+1}/{epoches}, Train loss: {train_loss:.4f}, Val loss: {val_loss:.4f}, Duration: {dt}")
  print('-------------------------------------------------------------')

[92m[1mModel saved at epoch: 1, val_loss improved from: inf to: 1.2900[0m
Epoch 1/10, Train loss: 1.2342, Val loss: 1.2900, Duration: 1:06:24.908837
-------------------------------------------------------------
[92m[1mModel saved at epoch: 2, val_loss improved from: 1.2900 to: 1.2695[0m
Epoch 2/10, Train loss: 1.1865, Val loss: 1.2695, Duration: 1:09:18.140698
-------------------------------------------------------------
[92m[1mModel saved at epoch: 3, val_loss improved from: 1.2695 to: 1.2619[0m
Epoch 3/10, Train loss: 1.1719, Val loss: 1.2619, Duration: 1:10:37.726495
-------------------------------------------------------------

In [36]:
import json

#d = {'Name': "Bob", 'Age': 28}

# Convert dictionary to a JSON string and write to file
with open('word_vocab.json', 'w') as file:
    file.write(json.dumps(word_vocabulary, indent=4))

In [37]:
with open('char_vocab.json', 'w') as file:
    file.write(json.dumps(vocabulary, indent=4))

In [43]:
ek=torch.load("embedding_matrix.pt")

In [46]:
np.isclose(embedding_matrix,ek)[0]

array([ True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,  True,  True,  True,  True,  True,  True,  True,
        True,  True,

In [None]:
for inputs,targets in test_loader:
    # move data to gpu
    inputs,targets = (inputs[0].to(device),inputs[1].to(device)),targets.to(device)
   # inputs = inputs.permute(0,2,1)


    # forward pass
    outputs,hidden_state = model((inputs[0].view(32,-1).long(),inputs[1].view(32,-1).long()),None)
    loss = criterion(outputs,targets.squeeze(1))

In [39]:
train_loss,val_loss

(1.3491756223337092, 1.7551075440319523)

In [40]:
torch.save({
                    'epoch': 30,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'train_loss': train_loss,
                    'val_loss': val_loss,
                    }, "model.pt")

In [47]:
input =tokenizer("How are you toda")
input1=F.pad(input[0], (90-len(input[0]),0), "constant", vocabulary["<PAD>"])
print(input[0].shape,input[1].shape)
out=model((input1.view(1,-1).long().to(device),input[1].view(1,-1).long().to(device)),None)

torch.Size([16]) torch.Size([90])


In [48]:
out[0].shape

torch.Size([1, 57])

In [49]:
flipped_dict = {v: k for k, v in vocabulary.items()}


In [50]:
torch.argmax(out[0]).detach().cpu().item()
flipped_dict[26]

'y'

In [51]:
import time
k="How are you toda"
hidden_state=None
for i in range(15):
  input =tokenizer(k)
  input1=F.pad(input[0], (90-len(input[0]),0), "constant", vocabulary["<PAD>"])
  data = tokenizer(k)
  outputs,hidden_state = model((input1.view(1,-1).long().to(device),input[1].view(1,-1).long().to(device)),None)
  out = F.softmax(outputs, dim=-1)
  out = torch.argmax(out).detach().cpu().item()
  #print(outputs[:,-1,:].shape)
  new_letter = flipped_dict[out]
  k = k+new_letter
#  time.sleep(0.5)
 # print('---------------------------------------------------')
print(k)

How are you today and streamers


In [None]:
def next_char(text, temperature=1):
   # print("k "+ text)
    # Predict using the model (assuming text is already processed into the right tensor format)
    with torch.no_grad():

      input =tokenizer(text)
      input1=F.pad(input[0], (90-len(input[0]),0), "constant", vocabulary["<PAD>"])
      logits,hidden_state = model((input1.view(1,-1).long().to(device),input[1].view(1,-1).long().to(device)),None)
     # logits,_ = model(data,None)  # Replace with proper text input processing
    # Get the logits for the last predicted character
    #logits = y_proba[:, -1, :]  # Assuming y_proba has shape [batch_size, seq_len, vocab_size]

    # Rescale logits using temperature
    rescaled_logits = logits / temperature
    # Apply softmax to get probabilities and then sample from the categorical distribution
    probabilities = F.softmax(rescaled_logits, dim=-1)
    #char_id = torch.multinomial(probabilities, num_samples=1).item()
    char_id = torch.argmax(logits).detach().cpu().item()
    # Get the vocabulary and return the corresponding character
    return flipped_dict[char_id]
def extend_text(text, n_chars=100, temperature=1):
    class bcolors:
        HEADER = '\033[95m'
        OKBLUE = '\033[94m'
        OKCYAN = '\033[96m'
        OKGREEN = '\033[92m'
        WARNING = '\033[93m'
        FAIL = '\033[91m'
        ENDC = '\033[0m'
        BOLD = '\033[1m'
        UNDERLINE = '\033[4m'

    first_len=len(text)
    for _ in range(n_chars):
        text += next_char(text, temperature)
    print(f"{bcolors.OKGREEN}{text[:first_len]}{bcolors.ENDC}{text[first_len:]}")
    #return text

In [None]:
k="The red part is quite hard to ch"

print(extend_text(k, temperature=1))


[92mThe red part is quite hard to ch[0maracter and the first contructionaly

externally

externally

externally

externally

externally

ex
None
