<a href="https://colab.research.google.com/github/aishanisingh/ML-Notes-in-Markdown/blob/master/english_to_spanish_translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# English to Spanish translation

## Import the libraries

In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

## Dataset

In [None]:
dataset = keras.utils.get_file(
    fname="spa-eng.zip",
    origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
    extract=True,
)
dataset = pathlib.Path(dataset).parent / "spa-eng" / "spa.txt"

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


## Convert the data

In [None]:
with open(dataset) as f:
    sentences = f.read().split("\n")[:-1]
data = []
for sentence in sentences:
    english, spanish = sentence.split("\t")
    spanish = "[start] " + spanish + " [end]"
    data.append((english, spanish))

In [None]:
for i in range(5):
    print(random.choice(data))

('People speak so much about the need for leaving a better planet for our children, and forget the urgency of leaving better children for our planet.', '[start] La gente habla tanto de que necesitan dejar un mejor planeta para nuestros hijos, y se olvidan de la urgencia de dejar mejores niños para nuestro planeta. [end]')
('Nobody knows the truth.', '[start] Nadie sabe la verdad. [end]')
('Do you want to talk?', '[start] ¿Querés hablar? [end]')
("No one's working.", '[start] Nadie está trabajando. [end]')
('I heard an unusual sound.', '[start] Oí un ruido extraño. [end]')


In [None]:
random.shuffle(data)
pairs = int(0.15 * len(data))
sampling = len(data) - 2 * pairs
training = data[:sampling]
validation = data[sampling : sampling + pairs]
testing = data[sampling + pairs :]

print(f"{len(data)} total pairs")
print(f"{len(training)} training pairs")
print(f"{len(validation)} validation pairs")
print(f"{len(testing)} test pairs")

118964 total pairs
83276 training pairs
17844 validation pairs
17844 test pairs


In [None]:
character = string.punctuation + "¿"
character = character.replace("[", "")
character = character.replace("]", "")

vocabulary = 15000
size = 20
num_samples = 64


def custom_standardization(input_string):
    letter = tf.strings.lower(input_string)
    return tf.strings.regex_replace(letter, "[%s]" % re.escape(character), "")


english_vectorization = TextVectorization(
    max_tokens=vocabulary, output_mode="int", output_sequence_length=size,
)
spanish_vectorization = TextVectorization(
    max_tokens=vocabulary,
    output_mode="int",
    output_sequence_length=size + 1,
    standardize=custom_standardization,
)
english_sentences = [pair[0] for pair in training]
spanish_sentences = [pair[1] for pair in training]
english_vectorization.adapt(english_sentences)
spanish_vectorization.adapt(spanish_sentences)

In [None]:
def format_dataset(english, spanish):
    english = english_vectorization(english)
    spanish = spanish_vectorization(spanish)
    return ({"encoder_inputs": english, "decoder_inputs": spanish[:, :-1],}, spanish[:, 1:])


def make_dataset(pairs):
    english_texts, spanish_texts = zip(*pairs)
    english_texts = list(english_texts)
    spanish_texts = list(spanish_texts)
    dataset = tf.data.Dataset.from_tensor_slices((english_texts, spanish_texts))
    dataset = dataset.batch(num_samples)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


training_dataset = make_dataset(training)
validation_dataset = make_dataset(validation)

In [None]:
for source, destination in training_dataset.take(1):
    print(f'source["encoder_inputs"].shape: {source["encoder_inputs"].shape}')
    print(f'source["decoder_inputs"].shape: {source["decoder_inputs"].shape}')
    print(f"destination.shape: {destination.shape}")

source["encoder_inputs"].shape: (64, 20)
source["decoder_inputs"].shape: (64, 20)
destination.shape: (64, 20)


## Model

In [None]:
from tensorflow.python.types.core import Value

class TransformerEncoder(layers.Layer):
    def __init__(self, embedding_dimension, dimensionality, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embedding_dimension = embedding_dimension
        self.dimensionality = dimensionality
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embedding_dimension
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dimensionality, activation="relu"), layers.Dense(embedding_dimension),]
        )
        self.normalization1 = layers.LayerNormalization()
        self.normalization2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, source, mask=None):
        if mask is not None:
            attention_scores = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=source, value=source, key=source, attention_mask=attention_scores
        )
        proj_input = self.normalization1(source + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.normalization2(proj_input + proj_output)
    def get_config(self):
        parameter = super().get_config()
        parameter.update({
            "embedding_dimension": self.embedding_dimension,
            "dimensionality": self.dimensionality,
            "num_heads": self.num_heads,
        })
        return parameter


class PositionalEmbedding(layers.Layer):
    def __init__(self, size, vocabulary, embedding_dimension, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocabulary, output_dim=embedding_dimension
        )
        self.position_embeddings = layers.Embedding(
            input_dim=size, output_dim=embedding_dimension
        )
        self.size = size
        self.vocabulary = vocabulary
        self.embedding_dimension = embedding_dimension

    def call(self, source):
        value = tf.shape(source)[-1]
        positions = tf.range(start=0, limit=value, delta=1)
        units_embeddings = self.token_embeddings(source)
        order_embeddings = self.position_embeddings(positions)
        return units_embeddings + order_embeddings

    def compute_mask(self, source, mask=None):
        return tf.math.not_equal(source, 0)
    def get_config(self):
        parameter = super().get_config()
        parameter.update({
            "size": self.size,
            "vocabulary": self.vocabulary,
            "embedding_dimension": self.embedding_dimension,
        })
        return parameter


class TransformerDecoder(layers.Layer):
    def __init__(self, embedding_dimension, internal_dimension, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embedding_dimension = embedding_dimension
        self.internal_dimension = internal_dimension
        self.num_heads = num_heads
        self.multihead_attention1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embedding_dimension
        )
        self.multihead_attention2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embedding_dimension
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(internal_dimension, activation="relu"), layers.Dense(embedding_dimension),]
        )
        self.normalization1 = layers.LayerNormalization()
        self.normalization2 = layers.LayerNormalization()
        self.normalization3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, source, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(source)
        if mask is not None:
            attention_scores = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            attention_scores = tf.minimum(attention_scores, causal_mask)

        attention_output_1 = self.multihead_attention1(
            query=source, value=source, key=source, attention_mask=causal_mask
        )
        out_1 = self.normalization1(source + attention_output_1)

        attention_output_2 = self.multihead_attention2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=attention_scores,
        )
        out_2 = self.normalization2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.normalization3(out_2 + proj_output)

    def get_causal_attention_mask(self, source):
        input_shape = tf.shape(source)
        num_samples, size = input_shape[0], input_shape[1]
        i = tf.range(size)[:, tf.newaxis]
        j = tf.range(size)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(num_samples, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)
    def get_config(self):
        parameter = super().get_config()
        parameter.update({
            "embedding_dimension": self.embedding_dimension,
            "internal_dimension": self.internal_dimension,
            "num_heads": self.num_heads,
        })
        return parameter


In [None]:
embedding_dimension = 256
internal_dimension = 2048
num_heads = 8

source_encoder = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
sample = PositionalEmbedding(size, vocabulary, embedding_dimension)(source_encoder)
destination_encoder = TransformerEncoder(embedding_dimension, internal_dimension, num_heads)(sample)
encoder = keras.Model(source_encoder, destination_encoder)

source_decoder = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
source_sequence = keras.Input(shape=(None, embedding_dimension), name="decoder_state_inputs")
sample = PositionalEmbedding(size, vocabulary, embedding_dimension)(source_decoder)
sample = TransformerDecoder(embedding_dimension, internal_dimension, num_heads)(sample, source_sequence)
sample = layers.Dropout(0.5)(sample)
destination_decoder = layers.Dense(vocabulary, activation="softmax")(sample)
decoder = keras.Model([source_decoder, source_sequence], destination_decoder)

destination_decoder = decoder([source_decoder, destination_encoder])
transformer = keras.Model(
    [source_encoder, source_decoder], destination_decoder, name="transformer"
)

## Train the model

In [None]:
#Commenting since TRAINING IS COMPLETE
#epochs = 30

#transformer.summary()
#transformer.compile(
#    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
#)
#transformer.fit(training_dataset, epochs=epochs, validation_data=validation_dataset)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_inputs (InputLayer  [(None, None)]               0         []                            
 )                                                                                                
                                                                                                  
 positional_embedding (Posi  (None, None, 256)            3845120   ['encoder_inputs[0][0]']      
 tionalEmbedding)                                                                                 
                                                                                                  
 decoder_inputs (InputLayer  [(None, None)]               0         []                            
 )                                                                                      

<keras.src.callbacks.History at 0x7a557c4202e0>

In [41]:
# prompt: save transformer
# SAVED AFTER FIRST SUCCESSFULL execution

#transformer.save_weights('./checkpoints/my_checkpoint')


In [44]:
transformer.load_weights("./checkpoints/my_checkpoint")

<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7a5503b11480>

## Decode the sentences

In [45]:
spanish_vocabulary = spanish_vectorization.get_vocabulary()
spanish_index = dict(zip(range(len(spanish_vocabulary)), spanish_vocabulary))
decoded_size = 20


def decode(original_sentence):
    source_units = english_vectorization([original_sentence])
    decoded_sentence = "[start]"
    for i in range(decoded_size):
        destination_units = spanish_vectorization([decoded_sentence])[:, :-1]
        transform = transformer([source_units, destination_units])

        index_unit = np.argmax(transform[0, i, :])
        sampled_unit = spanish_index[index_unit]
        decoded_sentence += " " + sampled_unit

        if sampled_unit == "[end]":
            break
    return decoded_sentence


english_testing = [pair[0] for pair in testing]
for _ in range(30):
    original_sentence = random.choice(english_testing)
    print("English: ",original_sentence)
    translated_sentence = decode(original_sentence)
    print("Espanol: ",translated_sentence)

English:  I don't want to take risks.
Espanol:  [start] no quiero llevar todos los nombres [end]
English:  He was an Olympic champion in weightlifting.
Espanol:  [start] Él era un [UNK] [UNK] [end]
English:  I know what's at stake.
Espanol:  [start] sé lo que está en juego [end]
English:  What time did you get here this morning?
Espanol:  [start] a qué hora te has llegado esta mañana [end]
English:  Tom still has some time.
Espanol:  [start] tom todavía tiene un poco de tiempo [end]
English:  I think otherwise.
Espanol:  [start] creo lo mismo de una forma [end]
English:  I'm angry about what happened, too.
Espanol:  [start] también estoy enojado por lo que había dicho [end]
English:  The shop is just in front of the station.
Espanol:  [start] la tienda está justo al frente de la estación [end]
English:  Tom was impatient to see Mary again.
Espanol:  [start] tom está impaciente por ver a mary de nuevo [end]
English:  She was kind enough to accompany me to the station.
Espanol:  [start] 

## Translating a specific sentence

In [None]:
original_sentence = "This will help you"
print("English: ",original_sentence)
translated_sentence = decode(original_sentence)
print("Espanol: ",translated_sentence)

English:  This will help you
Espanol:  [start] esto te va a ayudar [end]


# Using Chrome Audo and speech to text

In [None]:
!pip install einops
!pip install ffmpeg-python
!pip install huggingsound
!pip install pydub

model = SpeechRecognitionModel("jonatasgrosman/wav2vec2-large-xlsr-53-english")

In [70]:
from IPython.display import HTML, Audio
from google.colab.output import eval_js
from base64 import b64decode
from scipy.io.wavfile import read as wav_read
import io
import ffmpeg
import pydub
from huggingsound import SpeechRecognitionModel

AUDIO_HTML = """
<script>
var my_div = document.createElement("DIV");
var my_p = document.createElement("P");
var my_btn = document.createElement("BUTTON");
var t = document.createTextNode("Press to start recording");

my_btn.appendChild(t);
//my_p.appendChild(my_btn);
my_div.appendChild(my_btn);
document.body.appendChild(my_div);

var base64data = 0;
var reader;
var recorder, gumStream;
var recordButton = my_btn;

var handleSuccess = function(stream) {
  gumStream = stream;
  var options = {
    //bitsPerSecond: 8000, //chrome seems to ignore, always 48k
    mimeType : 'audio/webm;codecs=opus'
    //mimeType : 'audio/webm;codecs=pcm'
  };
  //recorder = new MediaRecorder(stream, options);
  recorder = new MediaRecorder(stream);
  recorder.ondataavailable = function(e) {
    var url = URL.createObjectURL(e.data);
    var preview = document.createElement('audio');
    preview.controls = true;
    preview.src = url;
    document.body.appendChild(preview);

    reader = new FileReader();
    reader.readAsDataURL(e.data);
    reader.onloadend = function() {
      base64data = reader.result;
      //console.log("Inside FileReader:" + base64data);
    }
  };
  recorder.start();
  };

recordButton.innerText = "Recording... press to stop";

navigator.mediaDevices.getUserMedia({audio: true}).then(handleSuccess);


function toggleRecording() {
  if (recorder && recorder.state == "recording") {
      recorder.stop();
      gumStream.getAudioTracks()[0].stop();
      recordButton.innerText = "Saving the recording... pls wait!"
  }
}

// https://stackoverflow.com/a/951057
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

var data = new Promise(resolve=>{
//recordButton.addEventListener("click", toggleRecording);
recordButton.onclick = ()=>{
toggleRecording()

sleep(2000).then(() => {
  // wait 2000ms for the data to be available...
  // ideally this should use something like await...
  //console.log("Inside data:" + base64data)
  resolve(base64data.toString())

});

}
});

</script>
"""

def get_audio():
  display(HTML(AUDIO_HTML))
  data = eval_js("data")
  binary = b64decode(data.split(',')[1])

  process = (ffmpeg
    .input('pipe:0')
    .output('pipe:1', format='wav')
    .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True, quiet=True, overwrite_output=True)
  )
  output, err = process.communicate(input=binary)

  riff_chunk_size = len(output) - 8
  # Break up the chunk size into four bytes, held in b.
  q = riff_chunk_size
  b = []
  for i in range(4):
      q, r = divmod(q, 256)
      b.append(r)

  # Replace bytes 4:8 in proc.stdout with the actual size of the RIFF chunk.
  riff = output[:4] + bytes(b) + output[8:]

  sr, audio = wav_read(io.BytesIO(riff))

  return audio, sr


def write(f, sr, x, normalized=False):
    """numpy array to MP3"""
    channels = 2 if (x.ndim == 2 and x.shape[1] == 2) else 1
    if normalized:  # normalized array - each item should be a float in [-1, 1)
        y = np.int16(x * 2 ** 15)
    else:
        y = np.int16(x)
    song = pydub.AudioSegment(y.tobytes(), frame_rate=sr, sample_width=2, channels=channels)
    song.export(f, format="mp3", bitrate="320k")



In [71]:

audio, sr = get_audio()
write('out2.mp3', sr, audio)

In [72]:
audio_paths = ['/content/out2.mp3']

transcriptions = model.transcribe(audio_paths)
sentence = transcriptions[0]["transcription"]
print("English: " +sentence)

print("Spanish: " +decode(sentence))





100%|██████████| 1/1 [00:03<00:00,  3.23s/it]


English: i love you
Spanish: [start] te amo [end]
