In [2]:
pip install torch==2.0.1 torchtext==0.15.2


Collecting torch==2.0.1
  Downloading torch-2.0.1-cp311-cp311-manylinux1_x86_64.whl.metadata (24 kB)
Collecting torchtext==0.15.2
  Downloading torchtext-0.15.2-cp311-cp311-manylinux1_x86_64.whl.metadata (7.4 kB)
Collecting nvidia-cuda-nvrtc-cu11==11.7.99 (from torch==2.0.1)
  Downloading nvidia_cuda_nvrtc_cu11-11.7.99-2-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu11==11.7.99 (from torch==2.0.1)
  Downloading nvidia_cuda_runtime_cu11-11.7.99-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cuda-cupti-cu11==11.7.101 (from torch==2.0.1)
  Downloading nvidia_cuda_cupti_cu11-11.7.101-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu11==8.5.0.96 (from torch==2.0.1)
  Downloading nvidia_cudnn_cu11-8.5.0.96-2-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu11==11.10.3.66 (from torch==2.0.1)
  Downloading nvidia_cublas_cu11-11.10.3.66-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Co

In [3]:
pip install datasets

Collecting datasets
  Downloading datasets-3.3.2-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Downloading datasets-3.3.2-py3-none-any.whl (485 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m485.4/485.4 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading multiprocess-0.70.16-py311-none-any.whl (143 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading xx

In [3]:
import torch
import torch.nn as nn
import math

class LayerNormalization(nn.Module):

    def __init__(self, features: int, eps:float=10**-6) -> None:
        super().__init__()
        self.eps = eps
        self.alpha = nn.Parameter(torch.ones(features)) # alpha is a learnable parameter
        self.bias = nn.Parameter(torch.zeros(features)) # bias is a learnable parameter

    def forward(self, x):
        # x: (batch, seq_len, hidden_size)
         # Keep the dimension for broadcasting
        mean = x.mean(dim = -1, keepdim = True) # (batch, seq_len, 1)
        # Keep the dimension for broadcasting
        std = x.std(dim = -1, keepdim = True) # (batch, seq_len, 1)
        # eps is to prevent dividing by zero or when std is very small
        return self.alpha * (x - mean) / (std + self.eps) + self.bias

class FeedForwardBlock(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff) # w1 and b1
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model) # w2 and b2

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, d_ff) --> (batch, seq_len, d_model)
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))

class InputEmbeddings(nn.Module):

    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        # (batch, seq_len) --> (batch, seq_len, d_model)
        # Multiply by sqrt(d_model) to scale the embeddings according to the paper
        return self.embedding(x) * math.sqrt(self.d_model)

class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)
        # Create a matrix of shape (seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)
        # Create a vector of shape (seq_len)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1) # (seq_len, 1)
        # Create a vector of shape (d_model)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # (d_model / 2)
        # Apply sine to even indices
        pe[:, 0::2] = torch.sin(position * div_term) # sin(position * (10000 ** (2i / d_model))
        # Apply cosine to odd indices
        pe[:, 1::2] = torch.cos(position * div_term) # cos(position * (10000 ** (2i / d_model))
        # Add a batch dimension to the positional encoding
        pe = pe.unsqueeze(0) # (1, seq_len, d_model)
        # Register the positional encoding as a buffer
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False) # (batch, seq_len, d_model)
        return self.dropout(x)

class ResidualConnection(nn.Module):

        def __init__(self, features: int, dropout: float) -> None:
            super().__init__()
            self.dropout = nn.Dropout(dropout)
            self.norm = LayerNormalization(features)

        def forward(self, x, sublayer):
            return x + self.dropout(sublayer(self.norm(x)))

class MultiHeadAttentionBlock(nn.Module):

    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model # Embedding vector size
        self.h = h # Number of heads
        # Make sure d_model is divisible by h
        assert d_model % h == 0, "d_model is not divisible by h"

        self.d_k = d_model // h # Dimension of vector seen by each head
        self.w_q = nn.Linear(d_model, d_model, bias=False) # Wq
        self.w_k = nn.Linear(d_model, d_model, bias=False) # Wk
        self.w_v = nn.Linear(d_model, d_model, bias=False) # Wv
        self.w_o = nn.Linear(d_model, d_model, bias=False) # Wo
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1]
        # Just apply the formula from the paper
        # (batch, h, seq_len, d_k) --> (batch, h, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            # Write a very low value (indicating -inf) to the positions where mask == 0
            attention_scores.masked_fill_(mask == 0, -1e9)
        attention_scores = attention_scores.softmax(dim=-1) # (batch, h, seq_len, seq_len) # Apply softmax
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        # (batch, h, seq_len, seq_len) --> (batch, h, seq_len, d_k)
        # return attention scores which can be used for visualization
        return (attention_scores @ value), attention_scores

    def forward(self, q, k, v, mask):
        query = self.w_q(q) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        key = self.w_k(k) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        value = self.w_v(v) # (batch, seq_len, d_model) --> (batch, seq_len, d_model)

        # (batch, seq_len, d_model) --> (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)

        # Calculate attention
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)

        # Combine all the heads together
        # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)

        # Multiply by Wo
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        return self.w_o(x)

class EncoderBlock(nn.Module):

    def __init__(self, features: int, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(2)])

    def forward(self, x, src_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x

class Encoder(nn.Module):

    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class DecoderBlock(nn.Module):

    def __init__(self, features: int, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(3)])

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x

class Decoder(nn.Module):

    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

class ProjectionLayer(nn.Module):

    def __init__(self, d_model, vocab_size) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x) -> None:
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)

class Transformer(nn.Module):

    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer

    def encode(self, src, src_mask):
        # (batch, seq_len, d_model)
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)

    def decode(self, encoder_output: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor):
        # (batch, seq_len, d_model)
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

    def project(self, x):
        # (batch, seq_len, vocab_size)
        return self.projection_layer(x)

def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int=512, N: int=6, h: int=8, dropout: float=0.1, d_ff: int=2048) -> Transformer:
    # Create the embedding layers
    src_embed = InputEmbeddings(d_model, src_vocab_size)
    tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)

    # Create the positional encoding layers
    src_pos = PositionalEncoding(d_model, src_seq_len, dropout)
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)

    # Create the encoder blocks
    encoder_blocks = []
    for _ in range(N):
        encoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(d_model, encoder_self_attention_block, feed_forward_block, dropout)
        encoder_blocks.append(encoder_block)

    # Create the decoder blocks
    decoder_blocks = []
    for _ in range(N):
        decoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(d_model, decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
        decoder_blocks.append(decoder_block)

    # Create the encoder and decoder
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))
    decoder = Decoder(d_model, nn.ModuleList(decoder_blocks))

    # Create the projection layer
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)

    # Create the transformer
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)

    # Initialize the parameters
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)

    return transformer

In [4]:
!pip install spacy
!python -m spacy download en_core_web_sm


Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m55.7 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from torchtext.vocab import build_vocab_from_iterator
import spacy
from build_transformer import build_transformer  # Import your custom transformer

# Load SpaCy tokenizer
nlp = spacy.load("en_core_web_sm")

# Tokenization function
def tokenize_text(text):
    return [token.text for token in nlp(text.lower())]

# ==========================
# 1. DATA PREPROCESSING
# ==========================

print("🔄 Loading dataset...")
dataset = load_dataset("Helsinki-NLP/tatoeba_mt", "ara-eng")
train_data = dataset["validation"]

data_pairs = list(zip(train_data["sourceString"], train_data["targetString"]))
print(f"✅ Loaded {len(data_pairs)} sentence pairs.")

# Build vocabularies
special_tokens = ["<pad>", "<sos>", "<eos>"]
src_vocab = build_vocab_from_iterator((tokenize_text(pair[0]) for pair in data_pairs), specials=special_tokens)
tgt_vocab = build_vocab_from_iterator((tokenize_text(pair[1]) for pair in data_pairs), specials=special_tokens)

src_vocab.set_default_index(src_vocab["<pad>"])
tgt_vocab.set_default_index(tgt_vocab["<pad>"])

# ==========================
# 2. CUSTOM DATASET CLASS
# ==========================

class TranslationDataset(Dataset):
    def __init__(self, data_pairs, src_vocab, tgt_vocab, src_max_len=50, tgt_max_len=50):
        self.data_pairs = data_pairs
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.src_max_len = src_max_len
        self.tgt_max_len = tgt_max_len

    def __len__(self):
        return len(self.data_pairs)

    def __getitem__(self, idx):
        src_text, tgt_text = self.data_pairs[idx]

        src_tokens = ["<sos>"] + tokenize_text(src_text) + ["<eos>"]
        tgt_tokens = ["<sos>"] + tokenize_text(tgt_text) + ["<eos>"]

        src_indices = [self.src_vocab[token] for token in src_tokens][:self.src_max_len]
        tgt_indices = [self.tgt_vocab[token] for token in tgt_tokens][:self.tgt_max_len]

        src_indices += [self.src_vocab["<pad>"]] * (self.src_max_len - len(src_indices))
        tgt_indices += [self.tgt_vocab["<pad>"]] * ((self.tgt_max_len + 1) - len(tgt_indices))

        return torch.tensor(src_indices), torch.tensor(tgt_indices)

# Create dataset and dataloader
dataset = TranslationDataset(data_pairs, src_vocab, tgt_vocab)
batch_size = 64  # Increased batch size for efficiency

train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
print(f"✅ DataLoader initialized with {len(train_loader)} batches.")

# ==========================
# 3. MODEL INITIALIZATION
# ==========================

# Model hyperparameters
d_model = 256
h = 8
N = 4
d_ff = 512
dropout = 0.1
src_vocab_size = len(src_vocab)
tgt_vocab_size = len(tgt_vocab)
max_seq_len = 50

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"✅ Using device: {device}")

# Initialize the custom Transformer model
model = build_transformer(
    src_vocab_size=src_vocab_size,
    tgt_vocab_size=tgt_vocab_size,
    src_seq_len=max_seq_len,
    tgt_seq_len=max_seq_len,
    d_model=d_model,
    N=N,
    h=h,
    dropout=dropout,
    d_ff=d_ff
).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab["<pad>"])
optimizer = optim.Adam(model.parameters(), lr=3e-4)

# ==========================
# 4. TRAINING FUNCTION
# ==========================

def train(model, dataloader, criterion, optimizer, num_epochs=5):
    model.train()
    print("🚀 Training Started...")

    for epoch in range(num_epochs):
        total_loss = 0
        print(f"\n🔵 Epoch {epoch+1}/{num_epochs}")

        for i, (src, tgt) in enumerate(dataloader):
            src, tgt = src.to(device), tgt.to(device)
            optimizer.zero_grad()

            print(f"🟡 Batch {i+1}/{len(dataloader)} - Src Shape: {src.shape}, Tgt Shape: {tgt.shape}")

            tgt_input = tgt[:, :-1]  # Shift target sequence
            tgt_output = tgt[:, 1:].reshape(-1)  # Shift for output

            encoder_output = model.encode(src, None)
            decoder_output = model.decode(encoder_output, None, tgt_input, None)
            output = model.project(decoder_output).view(-1, tgt_vocab_size)

            loss = criterion(output, tgt_output)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

            total_loss += loss.item()

        print(f"✅ Epoch [{epoch+1}/{num_epochs}], Avg Loss: {total_loss/len(dataloader):.4f}")

    print("🎉 Training Complete!")

# Train the model
train(model, train_loader, criterion, optimizer, num_epochs=10)

# Save the model
torch.save(model.state_dict(), "arabic_english_transformer.pth")
print("✅ Model saved successfully!")


🔄 Loading dataset...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/12.1k [00:00<?, ?B/s]

tatoeba_mt.py:   0%|          | 0.00/15.5k [00:00<?, ?B/s]

dataset_infos.json:   0%|          | 0.00/1.96M [00:00<?, ?B/s]

tatoeba-test.ara-eng.tsv:   0%|          | 0.00/938k [00:00<?, ?B/s]

tatoeba-dev.ara-eng.tsv:   0%|          | 0.00/1.78M [00:00<?, ?B/s]

Generating test split:   0%|          | 0/10304 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/19528 [00:00<?, ? examples/s]

✅ Loaded 19528 sentence pairs.
✅ DataLoader initialized with 306 batches.
✅ Using device: cpu
🚀 Training Started...

🔵 Epoch 1/10
🟡 Batch 1/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 2/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 3/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 4/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 5/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 6/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 7/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 8/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 9/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 10/306 - Src Shape: torch.Size([64, 50]), Tgt Shape: torch.Size([64, 51])
🟡 Batch 11/306 - Src Shape: torch.Size([64, 50]), Tgt Shape

In [5]:
!pip install streamlit pandas torch torchtext pyngrok --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m48.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m42.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.1/79.1 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [15]:
%%writefile app.py
import streamlit as st
import torch
import torch.nn as nn
from torchtext.vocab import build_vocab_from_iterator
import base64
from datasets import load_dataset
import spacy

# ==========================
# 1. COVER PAGE DESIGN
# ==========================
image_path = "cover_image.jpg"  # Ensure this file exists in the directory
def get_base64(image_path):
    with open(image_path, "rb") as img_file:
        return base64.b64encode(img_file.read()).decode()

base64_image = get_base64(image_path)
st.set_page_config(page_title="Arabic to English Translator", layout="centered")
st.markdown(
    f"""
    <style>
    [data-testid="stAppViewContainer"] {{
        background-image: url("data:image/jpeg;base64,{base64_image}");
        background-size: cover;
        background-position: center;
        background-repeat: no-repeat;
    }}
    [data-testid="stAppViewContainer"] {{
        color: rgb(0, 0, 0); !important;
    }}
    </style>
    """,
    unsafe_allow_html=True
)

st.title("Arabic to English Translator")
st.write("Translate Arabic text into English using a Transformer model.")

# ==========================
# 2. LOAD DATASET & TOKENIZER
# ==========================
@st.cache_resource
def load_vocab_and_tokenizer():
    nlp = spacy.load("en_core_web_sm")

    def tokenize_text(text):
        return [token.text for token in nlp(text.lower())]

    print("🔄 Loading dataset...")
    dataset = load_dataset("Helsinki-NLP/tatoeba_mt", "ara-eng")
    train_data = dataset["validation"]
    data_pairs = list(zip(train_data["sourceString"], train_data["targetString"]))

    # Build vocabularies
    special_tokens = ["<pad>", "<sos>", "<eos>"]
    src_vocab = build_vocab_from_iterator((tokenize_text(pair[0]) for pair in data_pairs), specials=special_tokens)
    tgt_vocab = build_vocab_from_iterator((tokenize_text(pair[1]) for pair in data_pairs), specials=special_tokens)

    src_vocab.set_default_index(src_vocab["<pad>"])
    tgt_vocab.set_default_index(tgt_vocab["<pad>"])

    return src_vocab, tgt_vocab, tokenize_text

# Load vocabularies and tokenizer
src_vocab, tgt_vocab, tokenize_text = load_vocab_and_tokenizer()
idx_to_tgt = {idx: token for token, idx in tgt_vocab.get_stoi().items()}

# ==========================
# 3. MODEL DEFINITION & LOADING
# ==========================
class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, N, h, dropout, d_ff):
        super().__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 50, d_model))

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=h,
            num_encoder_layers=N,
            num_decoder_layers=N,
            dim_feedforward=d_ff,
            dropout=dropout,
            batch_first=True
        )

        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def generate_square_subsequent_mask(self, size):
        mask = torch.triu(torch.ones(size, size), diagonal=1)
        return mask.masked_fill(mask == 1, float('-inf'))

    def forward(self, src, tgt):
        src_mask = None
        tgt_mask = self.generate_square_subsequent_mask(tgt.shape[1]).to(src.device)

        src_emb = self.encoder_embedding(src) + self.positional_encoding[:, :src.shape[1], :]
        tgt_emb = self.decoder_embedding(tgt) + self.positional_encoding[:, :tgt.shape[1], :]

        output = self.transformer(src_emb, tgt_emb, src_mask=src_mask, tgt_mask=tgt_mask)
        return self.fc_out(output)

@st.cache_resource
def load_model():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = TransformerModel(len(src_vocab), len(tgt_vocab), 256, 4, 8, 0.1, 512)
    model.load_state_dict(torch.load("arabic_english_transformer (1).pth", map_location=device))
    model.to(device)
    model.eval()
    return model, device

# Load model
model, device = load_model()
print("✅ Model loaded successfully!")

# ==========================
# 4. TRANSLATION FUNCTION
# ==========================
def translate_arabic(text, max_length=50):
    model.eval()

    src_tokens = ["<sos>"] + tokenize_text(text) + ["<eos>"]
    src_indices = [src_vocab[token] for token in src_tokens] + [src_vocab["<pad>"]] * (max_length - len(src_tokens))
    src_tensor = torch.tensor([src_indices], device=device)

    tgt_indices = [tgt_vocab["<sos>"]]
    tgt_tensor = torch.tensor([tgt_indices], device=device)

    for _ in range(max_length):
        with torch.no_grad():
            output = model(src_tensor, tgt_tensor)
            next_token = output[:, -1, :].argmax(dim=-1).item()

        tgt_indices.append(next_token)
        if next_token == tgt_vocab["<eos>"]:
            break
        tgt_tensor = torch.tensor([tgt_indices], device=device)

    return " ".join(idx_to_tgt[idx] for idx in tgt_indices[1:-1])

# ==========================
# 5. STREAMLIT UI
# ==========================
st.subheader("Translate Arabic to English")
arabic_text = st.text_area("Enter Arabic Text:", "مرحبا كيف حالك؟")

if st.button("Translate"):
    with st.spinner("Translating..."):
        translated_text = translate_arabic(arabic_text)
    st.success("Translated English Text:")
    st.write(translated_text)


Writing app.py


In [12]:
!wget -q -O - ipv4.icanhazip.com

34.135.252.61


In [16]:
!streamlit run app.py & npx localtunnel --port 8501


Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[0m
[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[0m
[34m[1m  You can now view your Streamlit app in your browser.[0m
[0m
[34m  Local URL: [0m[1mhttp://localhost:8501[0m
[34m  Network URL: [0m[1mhttp://172.28.0.12:8501[0m
[34m  External URL: [0m[1mhttp://34.135.252.61:8501[0m
[0m
[1G[0Kyour url is: https://twenty-needles-lick.loca.lt
🔄 Loading dataset...
✅ Model loaded successfully!
✅ Model loaded successfully!
  return torch._native_multi_head_attention(
[34m  Stopping...[0m
^C
