<a href="https://colab.research.google.com/github/aishanisingh/LiveConv/blob/main/engspa_and_spaeng_translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Translator**: Spanish-to-English & English-to-Spanish translation build using Transformer Architecture

## Training

In [5]:
import os,io
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import tensorflow as tf
import warnings
warnings.filterwarnings('ignore')

# Some constants here
# This variable determines whether the model learns Spanish-to-English translation or English-to-Spanish translation.
# Once the epochs are finished, the weights are stored in Google Drive. During inference,
# the weights corresponding to the detected input language are loaded.

TRANSLATION_PATH="engspa"
#TANSLATION_PATH="spaeng"
EPOCHS=10

In [5]:
# Downloading the file
zip_file = tf.keras.utils.get_file(
    'spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

file_path = os.path.dirname(zip_file)+"/spa-eng/spa.txt"

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [6]:
! head -10 /root/.keras/datasets/spa-eng/spa.txt

Go.	Ve.
Go.	Vete.
Go.	Vaya.
Go.	Váyase.
Hi.	Hola.
Run!	¡Corre!
Run.	Corred.
Who?	¿Quién?
Fire!	¡Fuego!
Fire!	¡Incendio!


In [7]:
def load_data(path, size=None):
    text = io.open(file_path, encoding='UTF-8').read()
    lines = text.splitlines()
    pairs = [line.split('\t') for line in lines]

    if TRANSLATION_PATH == "spaeng":
      source = np.array([source for target, source in pairs])  # extract source text into a numpy array
      target = np.array([target for target, source in pairs])  # extract target text into a numpy array
    else:
      source = np.array([source for source, target in pairs])  # extract source text into a numpy array
      target = np.array([target for source, target in pairs])  # extract target text into a numpy array


    return source, target

In [8]:
src_sentences, tgt_sentences = load_data(file_path)
print("Original Sentence:",src_sentences[42])
print("Translated Sentence:",tgt_sentences[42])

Original Sentence: I know.
Translated Sentence: Yo lo sé.


In [9]:
import re, itertools
from collections import Counter
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

In [10]:
import unicodedata

def unicode_to_ascii(s):
    normalized = unicodedata.normalize('NFD', s)
    return ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')

def preprocess_text(text):
  text = unicode_to_ascii(text.lower().strip())
  text = re.sub(r"[^a-zA-Z?.!,¿]+", " ", text)
  text = re.sub(r"([?.!,¿])", r" \1 ", text)
  text = re.sub(r'[" "]+', " ", text)
  text = text.rstrip().strip()
  text = '<sos> ' + text + ' <eos>'

  return text

In [11]:
print('Original sentence:',src_sentences[42])
prc_src_sentences = [preprocess_text(w) for w in src_sentences]
prc_tgt_sentences = [preprocess_text(w) for w in tgt_sentences]
print('Preprocessed sentence:',prc_src_sentences[42])

Original sentence: I know.
Preprocessed sentence: <sos> i know . <eos>


In [12]:
def tokenize(sentences):
    lang_tokenizer = Tokenizer( filters='')
    lang_tokenizer.fit_on_texts(sentences)
    sequences = lang_tokenizer.texts_to_sequences(sentences)
    max_length = max(len(s) for s in sequences)
    sequences = pad_sequences(sequences, maxlen=max_length, padding='post', truncating='post')

    return sequences, lang_tokenizer, max_length

In [13]:
def load_sequences(path, size=None):
    src_sentences, tgt_sentences = load_data(path)
    src_sentences = [preprocess_text(w) for w in src_sentences]
    tgt_sentences = [preprocess_text(w) for w in tgt_sentences]

    src_sequences,src_lang_tokenizer,max_length_src = tokenize(src_sentences)
    tgt_sequences,tgt_lang_tokenizer,max_length_trg = tokenize(tgt_sentences)

    return src_sequences, tgt_sequences, src_lang_tokenizer, tgt_lang_tokenizer, max_length_src, max_length_trg


In [14]:
src_sequences, tgt_sequences, src_lang_tokenizer, tgt_lang_tokenizer, max_length_src, max_length_trg = load_sequences(file_path)

In [15]:
# getting the size of the input and output vocabularies.
src_vocab_size = len(src_lang_tokenizer.word_index)+1
tgt_vocab_size = len(tgt_lang_tokenizer.word_index)+1
print(src_vocab_size)
print(tgt_vocab_size)

12934
24794


In [16]:
source_sequences_train,source_sequences_val,tgt_sequences_train,tgt_sequences_val = train_test_split(src_sequences,
                                                                                                     tgt_sequences,
                                                                                                     shuffle=False,
                                                                                                     test_size=0.2)
print(len(source_sequences_train),len(source_sequences_val),len(tgt_sequences_train),len(tgt_sequences_val))

95171 23793 95171 23793


In [17]:
# Defining hyperparameters
buffer_size=len(source_sequences_train)
val_buffer_size = len(source_sequences_val)
BATCH_SIZE = 64
embedding_dim = 128
units = 1024
steps_per_epoch = buffer_size//BATCH_SIZE
val_steps_per_epoch = val_buffer_size//BATCH_SIZE

In [18]:
train_dataset = tf.data.Dataset.from_tensor_slices((source_sequences_train, tgt_sequences_train))

train_dataset = train_dataset.shuffle(buffer_size=buffer_size).batch(BATCH_SIZE)

val_dataset = tf.data.Dataset.from_tensor_slices((source_sequences_val, tgt_sequences_val))

val_dataset = val_dataset.batch(BATCH_SIZE)

example_input_batch, example_target_batch = next(iter(train_dataset))
example_input_batch.shape, example_target_batch.shape

(TensorShape([64, 51]), TensorShape([64, 53]))

In [19]:
class Encoder(tf.keras.Model):

    def __init__(self, vocab_size, emb_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.enc_units = enc_units
        self.batch_sz = batch_sz
        self.embedding = tf.keras.layers.Embedding(vocab_size, emb_dim,mask_zero=True)
        self.gru = tf.keras.layers.GRU(self.enc_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')

    def call(self, x, hidden):
        x = self.embedding(x)
        output, state = self.gru(x, initial_state = hidden)
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units))

class BahdanauAttention(tf.keras.layers.Layer):
  def __init__(self, units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units) # fully-connected dense layer-1
    self.W2 = tf.keras.layers.Dense(units) # fully-connected dense layer-2
    self.V = tf.keras.layers.Dense(1) # fully-connected dense layer-3

  def call(self, query, values):

    query_with_time_axis = tf.expand_dims(query, 1)
    score = self.V(tf.nn.tanh(self.W1(query_with_time_axis) + self.W2(values)))
    attention_weights = tf.nn.softmax(score, axis=1)
    context_vector = attention_weights * values
    context_vector = tf.reduce_sum(context_vector, axis=1)

    return context_vector, attention_weights

class Decoder(tf.keras.Model):
  def __init__(self, vocab_size, emb_dim, dec_units, batch_sz):
    super(Decoder, self).__init__()
    self.batch_sz = batch_sz
    self.dec_units = dec_units
    self.attention = BahdanauAttention(self.dec_units)

    self.embedding = tf.keras.layers.Embedding(vocab_size, emb_dim)

    self.gru = tf.keras.layers.GRU(self.dec_units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc = tf.keras.layers.Dense(vocab_size)

  def call(self, x, hidden, enc_output):
    context_vector, attention_weights = self.attention(hidden, enc_output)
    x = self.embedding(x)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
    output, state = self.gru(x)
    output = tf.reshape(output, (-1, output.shape[2]))
    x = self.fc(output)
    return x, state , attention_weights

In [20]:
encoder = Encoder(src_vocab_size, embedding_dim, units, BATCH_SIZE)

sample_hidden = encoder.initialize_hidden_state()
sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

attention_layer = BahdanauAttention(20)
attention_result, attention_weights = attention_layer(sample_hidden, sample_output)

print("Attention result shape (context vector): (batch size, units) {}".format(attention_result.shape))
print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))

decoder = Decoder(tgt_vocab_size, embedding_dim, units, BATCH_SIZE)
sample_decoder_output, _, _ = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                      sample_hidden, sample_output)

print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))


Encoder output shape: (batch size, sequence length, units) (64, 51, 1024)
Encoder Hidden state shape: (batch size, units) (64, 1024)
Attention result shape (context vector): (batch size, units) (64, 1024)
Attention weights shape: (batch_size, sequence_length, 1) (64, 51, 1)
Decoder output shape: (batch_size, vocab size) (64, 24794)


In [21]:
optimizer = tf.keras.optimizers.Adam()

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')



def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

checkpoint_dir = './training_checkpoints/'+TRANSLATION_PATH
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

In [22]:
@tf.function
def train_step(inp, targ, enc_hidden):
  loss = 0

  with tf.GradientTape() as tape:
    enc_output, enc_hidden = encoder(inp, enc_hidden)

    dec_hidden = enc_hidden

    dec_input = tf.expand_dims([tgt_lang_tokenizer.word_index['<sos>']] * BATCH_SIZE, 1)

    for t in range(1, targ.shape[1]):
      predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)

      loss += loss_function(targ[:, t], predictions)

      dec_input = tf.expand_dims(targ[:, t], 1)

  batch_loss = (loss / int(targ.shape[1]))

  variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, variables)
  optimizer.apply_gradients(zip(gradients, variables))

  return batch_loss

In [23]:
@tf.function
def val_step(inp, targ, enc_hidden):
    loss = 0
    enc_output, enc_hidden = encoder(inp, enc_hidden)
    dec_hidden = enc_hidden
    dec_input =  tf.expand_dims([tgt_lang_tokenizer.word_index['<sos>']] * BATCH_SIZE, 1)

    for t in range(1, targ.shape[1]):
        predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
        loss += loss_function(targ[:, t], predictions)
        dec_input = tf.expand_dims(targ[:, t], 1)

    batch_loss = (loss / int(targ.shape[1]))

    return batch_loss

In [24]:
import time

def train_and_validate(train_dataset, val_dataset, EPOCHS):
    for epoch in range(EPOCHS):
        start = time.time()

        #Step1:
        enc_hidden = encoder.initialize_hidden_state()
        total_train_loss = 0
        total_val_loss = 0
        for (batch, (inp, targ)) in enumerate(train_dataset.take(steps_per_epoch)):
            batch_loss = train_step(inp, targ, enc_hidden)
            total_train_loss += batch_loss

            if batch % 100 == 0:
                print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
                                                            batch,
                                                            batch_loss.numpy()))

        for (batch, (inp, targ)) in enumerate(val_dataset.take(val_steps_per_epoch)):
            val_batch_loss = val_step(inp, targ, enc_hidden)
            total_val_loss += val_batch_loss

        if (epoch + 1) % 2 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

        print('Total training loss is {:.4f}'.format(total_train_loss / steps_per_epoch))
        print('Total validation loss is {:.4f}'.format( total_val_loss / val_steps_per_epoch))
        print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

In [25]:
########       TRAIN        ##########
train_and_validate(train_dataset, val_dataset, EPOCHS)

In [37]:
from google.colab import drive
drive.mount('/content/drive')

# Create a folder in the root directory
!mkdir -p "/content/drive/My Drive/LiveConv/SpaEng2"
!mkdir -p "/content/drive/My Drive/LiveConv/EngSpa2"

if TRANSLATION_PATH == 'spaeng':
  !cp -rf "/content/training_checkpoints/spaeng/" "/content/drive/My Drive/LiveConv/SpaEng2/"
else:
  !cp -rf "/content/training_checkpoints/engspa/" "/content/drive/My Drive/LiveConv/EngSpa2/"

Mounted at /content/drive


In [27]:
def plot_attention(attention, sentence, predicted_sentence):
  fig = plt.figure(figsize=(10, 10))
  ax = fig.add_subplot(1, 1, 1)
  ax.matshow(attention, cmap='viridis')
  fontdict = {'fontsize': 14}
  ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
  ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)
  ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
  ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
  plt.show()

In [28]:
def evaluate(sentence):
    attention_plot = np.zeros((max_length_trg, max_length_src))
    sentence = preprocess_text(sentence)
    inputs = [src_lang_tokenizer.word_index[i] for i in sentence.split(' ')]
    inputs = pad_sequences([inputs],
                          maxlen=max_length_src,
                          padding='post')
    inputs = tf.convert_to_tensor(inputs)

    result = ''
    hidden = [tf.zeros((1, units))]
    enc_out, enc_hidden = encoder(inputs, hidden)
    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([tgt_lang_tokenizer.word_index['<sos>']], 0)

    for t in range(max_length_trg):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                         dec_hidden,
                                                         enc_out)
        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention_plot[t] = attention_weights.numpy()
        predicted_id = tf.argmax(predictions[0]).numpy()
        result += tgt_lang_tokenizer.index_word[predicted_id] + ' '

        if tgt_lang_tokenizer.index_word[predicted_id] == '<eos>':
            return result, sentence, attention_plot
        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention_plot

In [47]:
def translate(sentence):
  result, sentence, attention_plot = evaluate(sentence)

  print()
  print('Input:', sentence)
  print('Predicted Translation:', result)

  attention_plot = attention_plot[:len(result.split(' ')),
                                 :len(sentence.split(' '))]
  plot_attention(attention_plot, sentence.split(' '), result.split(' '))

In [42]:
#checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

if TRANSLATION_PATH == 'engspa':
  checkpoint.restore(tf.train.latest_checkpoint('/content/drive/My Drive/LiveConv/EngSpa2/))
else:
  checkpoint.restore(tf.train.latest_checkpoint('/content/drive/My Drive/LiveConv/SpaEng2/'))

In [43]:
translate(u'are you still at home ?')

Input: <sos> are you still at home ? <eos>
Predicted Translation: ¿ todavia esta en casa ? <eos> 


In [44]:
translate(u'It is very cold today .')

Input: <sos> it is very cold today . <eos>
Predicted Translation: hoy hace mucho frio . <eos> 


# Using Chrome Audio and getting ready for Speech to Text (using SpeechRecognitionModel)

# Inference

In [None]:
!pip install einops
!pip install ffmpeg-python
!pip install huggingsound
!pip install pydub
!pip install -U openai-whisper

from IPython.display import HTML, Audio
from google.colab.output import eval_js
from base64 import b64decode
from scipy.io.wavfile import read as wav_read
import io
import ffmpeg
import pydub
import whisper
import warnings
warnings.filterwarnings('ignore')

model = whisper.load_model("base")


In [2]:
AUDIO_HTML = """
<script>
var my_div = document.createElement("DIV");
var my_p = document.createElement("P");
var my_btn = document.createElement("BUTTON");
var t = document.createTextNode("Press to start recording");

my_btn.appendChild(t);
//my_p.appendChild(my_btn);
my_div.appendChild(my_btn);
document.body.appendChild(my_div);

var base64data = 0;
var reader;
var recorder, gumStream;
var recordButton = my_btn;

var handleSuccess = function(stream) {
  gumStream = stream;
  var options = {
    //bitsPerSecond: 8000, //chrome seems to ignore, always 48k
    mimeType : 'audio/webm;codecs=opus'
    //mimeType : 'audio/webm;codecs=pcm'
  };
  //recorder = new MediaRecorder(stream, options);
  recorder = new MediaRecorder(stream);
  recorder.ondataavailable = function(e) {
    var url = URL.createObjectURL(e.data);
    var preview = document.createElement('audio');
    preview.controls = true;
    preview.src = url;
    document.body.appendChild(preview);

    reader = new FileReader();
    reader.readAsDataURL(e.data);
    reader.onloadend = function() {
      base64data = reader.result;
      //console.log("Inside FileReader:" + base64data);
    }
  };
  recorder.start();
  };

recordButton.innerText = "Recording... press to stop";

navigator.mediaDevices.getUserMedia({audio: true}).then(handleSuccess);


function toggleRecording() {
  if (recorder && recorder.state == "recording") {
      recorder.stop();
      gumStream.getAudioTracks()[0].stop();
      recordButton.innerText = "Saving the recording... pls wait!"
  }
}

// https://stackoverflow.com/a/951057
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

var data = new Promise(resolve=>{
//recordButton.addEventListener("click", toggleRecording);
recordButton.onclick = ()=>{
toggleRecording()

sleep(2000).then(() => {
  // wait 2000ms for the data to be available...
  // ideally this should use something like await...
  //console.log("Inside data:" + base64data)
  resolve(base64data.toString())

});

}
});

</script>
"""

def get_audio():
  display(HTML(AUDIO_HTML))
  data = eval_js("data")
  binary = b64decode(data.split(',')[1])

  process = (ffmpeg
    .input('pipe:0')
    .output('pipe:1', format='wav')
    .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True, quiet=True, overwrite_output=True)
  )
  output, err = process.communicate(input=binary)

  riff_chunk_size = len(output) - 8
  # Break up the chunk size into four bytes, held in b.
  q = riff_chunk_size
  b = []
  for i in range(4):
      q, r = divmod(q, 256)
      b.append(r)

  # Replace bytes 4:8 in proc.stdout with the actual size of the RIFF chunk.
  riff = output[:4] + bytes(b) + output[8:]

  sr, audio = wav_read(io.BytesIO(riff))

  return audio, sr


def write(f, sr, x, normalized=False):
    """numpy array to MP3"""
    channels = 2 if (x.ndim == 2 and x.shape[1] == 2) else 1
    if normalized:  # normalized array - each item should be a float in [-1, 1)
        y = np.int16(x * 2 ** 15)
    else:
        y = np.int16(x)
    song = pydub.AudioSegment(y.tobytes(), frame_rate=sr, sample_width=2, channels=channels)
    song.export(f, format="mp3", bitrate="320k")


In [7]:
audio, sr = get_audio()
write('/content/out2.mp3', sr, audio)

# load audio and pad/trim it to fit 30 seconds
audio = whisper.load_audio('/content/out2.mp3')
audio = whisper.pad_or_trim(audio)

# make log-Mel spectrogram and move to the same device as the model
mel = whisper.log_mel_spectrogram(audio).to(model.device)

# detect the spoken language
_, probs = model.detect_language(mel)
input_language = max(probs, key=probs.get)
print(f"Detected language: {input_language}")

# decode the audio
options = whisper.DecodingOptions()
result = whisper.decode(model, mel, options)

# print the recognized text
print(result.text)

if input_language == 'en':
  checkpoint.restore(tf.train.latest_checkpoint('/content/drive/My Drive/LiveConv/EngSpa2/'))
  translate(sentence)
else
  checkpoint.restore(tf.train.latest_checkpoint('/content/drive/My Drive/LiveConv/SpaEng2/'))
  translate(sentence)



Detected language: en
I have a sister.
