# **Building an English-to-German Translation**

[Click here to read blog](https://medium.com/@rameeshamalik.143/building-an-english-to-german-translation-module-7e2e2680108b)

# Necassary Imports & Install Libraries

In [12]:
!pip install gtts deepgram-sdk tensorflow numpy tensorflow-datasets gradio



In [13]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Embedding, Dense, LayerNormalization, Dropout
from tensorflow.keras.optimizers import Adam
import requests
from gtts import gTTS
import IPython.display as ipd
import pandas as pd
import tensorflow_datasets as tfds
import gradio as gr


# Dataset

In [14]:
# Load datasets
train_df = pd.read_csv('/content/drive/MyDrive/NLP/wmt14_translate_de-en_train.csv', encoding='latin-1')
val_df = pd.read_csv('/content/drive/MyDrive/NLP/wmt14_translate_de-en_validation.csv', encoding='latin-1')
test_df = pd.read_csv('/content/drive/MyDrive/NLP/wmt14_translate_de-en_test.csv', encoding='latin-1')

# Drop rows with missing values
train_df.dropna(subset=["en", "de"], inplace=True)
val_df.dropna(subset=["en", "de"], inplace=True)
test_df.dropna(subset=["en", "de"], inplace=True)

# Verify the dataset
print(f"Training dataset shape: {train_df.shape}")
print(f"Validation dataset shape: {val_df.shape}")
print(f"Test dataset shape: {test_df.shape}")


  train_df = pd.read_csv('/content/drive/MyDrive/NLP/wmt14_translate_de-en_train.csv', encoding='latin-1')


Training dataset shape: (29993, 141)
Validation dataset shape: (3000, 2)
Test dataset shape: (3003, 2)


In [15]:
# Tokenize English and German
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (text for text in train_df['en']), target_vocab_size=2**13)
tokenizer_de = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    (text for text in train_df['de']), target_vocab_size=2**13)

START_TOKEN = 1
END_TOKEN = 2


In [16]:
def create_tf_dataset(data, batch_size=64, max_seq_len=50):
    dataset = tf.data.Dataset.from_generator(
        lambda: ((tf.constant(en), tf.constant(de)) for en, de in data),
        output_signature=(
            tf.TensorSpec(shape=(None,), dtype=tf.int64),
            tf.TensorSpec(shape=(None,), dtype=tf.int64),
        )
    )
    # Filter sequences exceeding max length
    dataset = dataset.filter(lambda x, y: tf.size(x) <= max_seq_len and tf.size(y) <= max_seq_len)
    # Pad sequences to the same length
    dataset = dataset.padded_batch(batch_size, padded_shapes=([None], [None]))
    return dataset

train_dataset = create_tf_dataset(train_df)
val_dataset = create_tf_dataset(val_df)


# Transformer

In [17]:
# Positional Encoding
def positional_encoding(max_len, d_model):
    angle_rads = np.arange(max_len)[:, np.newaxis] / np.power(10000, (2 * (np.arange(d_model)[np.newaxis, :] // 2)) / np.float32(d_model))
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    pos_encoding = angle_rads[np.newaxis, ...]
    return tf.cast(pos_encoding, dtype=tf.float32)

# Scaled Dot-Product Attention
def scaled_dot_product_attention(q, k, v, mask=None):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    return tf.matmul(attention_weights, v), attention_weights

# Multi-Head Attention
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)
        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, q, k, v, mask):
        batch_size = tf.shape(q)[0]
        q = self.split_heads(self.wq(q), batch_size)
        k = self.split_heads(self.wk(k), batch_size)
        v = self.split_heads(self.wv(v), batch_size)
        scaled_attention, _ = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
        return self.dense(concat_attention)

# Feed-Forward Network
def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])

# Transformer Block (Encoder/Decoder Layer)
class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

# Encoder
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, max_seq_len, rate=0.1):
        super(Encoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(max_seq_len, d_model)
        self.enc_layers = [TransformerBlock(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)
        return x

# Decoder
class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, max_seq_len, rate=0.1):
        super(Decoder, self).__init__()
        self.d_model = d_model
        self.num_layers = num_layers
        self.embedding = Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(max_seq_len, d_model)
        self.dec_layers = [TransformerBlock(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]
        x = self.dropout(x, training=training)
        for i in range(self.num_layers):
            x = self.dec_layers[i](x, training, look_ahead_mask)
        return x

# Transformer Model
class Transformer(Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, max_seq_len, rate=0.1):
        super(Transformer, self).__init__()
        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, max_seq_len, rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, max_seq_len, rate)
        self.final_layer = Dense(target_vocab_size)

    def call(self, inputs, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inputs[0], training, enc_padding_mask)
        dec_output = self.decoder(inputs[1], enc_output, training, look_ahead_mask, dec_padding_mask)
        return self.final_layer(dec_output)


In [18]:
def create_padding_mask(seq):
    mask = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return mask[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

def create_masks(inp, tar):
    enc_padding_mask = create_padding_mask(inp)
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_padding_mask = create_padding_mask(inp)
    combined_mask = tf.maximum(create_padding_mask(tar), look_ahead_mask)
    return enc_padding_mask, combined_mask, dec_padding_mask


# Speech-to-Text Conversion

In [19]:
# Deepgram Speech-to-Text API
def speech_to_text(audio_file_path):
    api_url = "https://api.deepgram.com/v1/listen"
    api_key = "4f6f3f1686dd74ab520e485fb1edaf651750999e"
    headers = {"Authorization": f"Token {api_key}"}
    with open(audio_file_path, "rb") as audio:
        response = requests.post(api_url, headers=headers, files={"audio": audio})
    response.raise_for_status()
    return response.json()["results"]["channels"][0]["alternatives"][0]["transcript"]

# Text-to-Speech Output

In [20]:
# Text-to-Speech Function
def text_to_speech(text, language="de"):
    tts = gTTS(text, lang=language)
    tts.save("/content/drive/MyDrive/NLP/translated_audio.wav")
    return ipd.Audio("/content/drive/MyDrive/NLP/translated_audio.wav")

# Integration

In [21]:
import gradio as gr
import requests

def translate_speech_to_german(audio_file):
    # Step 1: Debugging - Print the received file path
    print("Received file path:", audio_file)

    # Step 2: Open and read the audio file
    try:
        with open(audio_file, 'rb') as f:
            audio_data = f.read()  # Read the file as binary
        print("Audio file read successfully!")
    except Exception as e:
        print(f"Error reading audio file: {e}")
        return "Error reading audio file."

    # Step 3: Transcription using Deepgram API
    try:
        DEEPGRAM_API_KEY = "4f6f3f1686dd74ab520e485fb1edaf651750999e"  # Replace with your Deepgram API key
        response = requests.post(
            'https://api.deepgram.com/v1/listen',
            headers={
                'Authorization': f'Token {DEEPGRAM_API_KEY}',
                'Content-Type': 'audio/wav',  # Ensure the audio format matches Deepgram's requirements
            },
            data=audio_data
        )

        if response.status_code != 200:
            print(f"Deepgram API error: {response.json()}")
            return "Error in transcription."

        transcription = response.json().get('results', {}).get('channels', [{}])[0].get('alternatives', [{}])[0].get('transcript', "")
        if not transcription:
            print("No transcription available.")
            return "No transcription generated."

        print("Transcription:", transcription)
    except Exception as e:
        print(f"Error during transcription: {e}")
        return "Error during transcription."

    # Step 4: Translation using LibreTranslate API
    try:
        LIBRETRANSLATE_API_URL = "https://libretranslate.de/translate"
        translation_response = requests.post(
            LIBRETRANSLATE_API_URL,
            headers={
                'Content-Type': 'application/json',
            },
            json={
                'q': transcription,
                'source': 'en',  # English source language
                'target': 'de',  # German target language
                'format': 'text',
            }
        )

        if translation_response.status_code != 200:
            print(f"LibreTranslate API error: {translation_response.json()}")
            return "Error in translation."

        translated_text = translation_response.json().get('translatedText', "")
        print("Translated Text:", translated_text)
    except Exception as e:
        print(f"Error during translation: {e}")
        return "Error during translation."

    # Step 5: Return the translated text
    return translated_text

# Gradio Interface
interface = gr.Interface(
    fn=translate_speech_to_german,
    inputs=gr.Audio(type="filepath"),  # Accepts audio file path
    outputs="text",
    title="English-to-German Translator",
    description="Upload an English audio file to get its German translation."
)

interface.launch()


Running Gradio in a Colab notebook requires sharing enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://f2d2e35f27a3a59c68.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


